/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
* Copyright (c) 1991-1999 University of Maryland at College Park
* Copyright (c) 2007-2012 Zmanda, Inc. All Rights Reserved.
* Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved.
* All Rights Reserved.
*
* Permission to use, copy, modify, distribute, and sell this software and its
* documentation for any purpose is hereby granted without fee, provided that
* the above copyright notice appear in all copies and that both that
* copyright notice and this permission notice appear in supporting
* documentation, and that the name of U.M. not be used in advertising or
* publicity pertaining to distribution of the software without specific,
* written prior permission. U.M. makes no representations about the
* suitability of this software for any purpose. It is provided "as is"
* without express or implied warranty.
*
* U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
* BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
* OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
* Authors: the Amanda Development Team. Its members are listed in a
* file named AUTHORS, in the root directory of this distribution.
*/
/*
* $Id: amandad.c,v 1.18 2006/08/21 20:17:09 martinea Exp $
*
* handle client-host side of Amanda network communications, including
* security checks, execution of the proper service, and acking the
* master side
*/
#include "amanda.h"
#include "amandad.h"
#include "clock.h"
#include "event.h"
#include "amfeatures.h"
#include "packet.h"
#include "version.h"
#include "security.h"
#include "stream.h"
#include "amutil.h"
#include "conffile.h"
#include "shm-ring.h"
#define REP_TIMEOUT (6*60*60) /* secs for service to reply */
#define ACK_TIMEOUT 10 /* XXX should be configurable */
#define STDERR_PIPE (DATA_FD_COUNT + 1)
#define amandad_debug(i, ...) do { \
if ((i) <= debug_amandad) { \
dbprintf(__VA_ARGS__); \
} \
} while (0)
/*
* These are the actions for entering the state machine
*/
typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
A_SENDNAK, A_TIMEOUT } action_t;
/*
* This is a state in the state machine. It is a function pointer to
* the function that actually implements the state.
*/
struct active_service;
typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
/* string that we scan for in sendbackup's MESG stream */
static const char info_end_str[] = "sendbackup: info end\n";
#define INFO_END_LEN (sizeof(info_end_str)-1)
/*
* Here are the services that we allow.
* Must be in the same order as services[].
*/
typedef enum {
SERVICE_NOOP,
SERVICE_SENDSIZE,
SERVICE_SENDBACKUP,
SERVICE_SELFCHECK,
SERVICE_AMINDEXD,
SERVICE_AMIDXTAPED,
SERVICE_AMDUMPD,
SERVICE_SENDDISCOVER,
SERVICE_RESTORE,
SERVICE_AMBACKUPD
} service_t;
static struct services {
char *name;
int active;
service_t service;
} services[] = {
{ "noop", 1, SERVICE_NOOP },
{ "sendsize", 1, SERVICE_SENDSIZE },
{ "sendbackup", 1, SERVICE_SENDBACKUP },
{ "selfcheck", 1, SERVICE_SELFCHECK },
{ "amindexd", 0, SERVICE_AMINDEXD },
{ "amidxtaped", 0, SERVICE_AMIDXTAPED },
{ "amdumpd", 0, SERVICE_AMDUMPD },
{ "senddiscover", 1, SERVICE_SENDDISCOVER },
{ "restore", 1, SERVICE_RESTORE },
{ "ambackupd", 0, SERVICE_AMBACKUPD }
};
#define NSERVICES G_N_ELEMENTS(services)
/*
* This structure describes an active running service.
*
* An active service is something running that we have received
* a request for. This structure holds info on that service, including
* file descriptors for data, etc, as well as the security handle
* for communications with the amanda server.
*/
struct active_service {
service_t service; /* service name */
char *cmd; /* name of command we ran */
char *arguments; /* arguments we sent it */
security_handle_t *security_handle; /* remote server */
state_t state; /* how far this has progressed */
pid_t pid; /* pid of subprocess */
int send_partial_reply; /* send PREP packet */
int reqfd; /* pipe to write requests */
int repfd; /* pipe to read replies */
int errfd; /* pipe to read stderr */
event_handle_t *ev_repfd; /* read event handle for repfd */
event_handle_t *ev_reptimeout; /* timeout for rep data */
event_handle_t *ev_errfd; /* read event handle for errfd */
pkt_t rep_pkt; /* rep packet we're sending out */
char *errbuf; /* buffer to read the err into */
char *repbuf; /* buffer to read the rep into */
size_t bufsize; /* length of repbuf */
size_t repbufsize; /* length of repbuf */
int repretry; /* times we'll retry sending the rep */
int seen_info_end; /* have we seen "sendbackup info end\n"? */
char info_end_buf[INFO_END_LEN]; /* last few bytes read, used for scanning for info end */
char *data_shm_control_name; /* from OPTIONS line */
shm_ring_t *shm_ring;
GThread *thread;
time_t last_prep_time;
/*
* General user streams to the process, and their equivalent
* network streams.
*/
struct datafd_handle {
int fd_read; /* pipe to child process */
int fd_write; /* pipe to child process */
event_handle_t *ev_read; /* it's read event handle */
event_handle_t *ev_write; /* it's write event handle */
security_stream_t *netfd; /* stream to amanda server */
shm_ring_t *shm_ring; /* when reading from shm-ring */
struct active_service *as; /* pointer back to our enclosure */
} data[DATA_FD_COUNT];
char databuf[NETWORK_BLOCK_BYTES]; /* buffer to relay netfd data in */
};
/*
* Queue of outstanding requests that we are running.
*/
GSList *serviceq = NULL;
static event_handle_t *exit_event;
static int exit_on_qlength = 0;
static char *auth = NULL;
static kencrypt_type amandad_kencrypt = KENCRYPT_NONE;
static char *global_error = NULL;
int main(int argc, char **argv);
static int allocstream(struct active_service *, int);
static void exit_check(void *);
static void protocol_accept(security_handle_t *, pkt_t *);
static void state_machine(struct active_service *, action_t, pkt_t *);
static action_t s_sendack(struct active_service *, action_t, pkt_t *);
static action_t s_repwait(struct active_service *, action_t, pkt_t *);
static action_t s_processrep(struct active_service *, action_t, pkt_t *);
static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
static void repfd_recv(void *);
static void process_errfd(void *cookie);
static void errfd_recv(void *);
static void timeout_repfd(void *);
static void protocol_recv(void *, pkt_t *, security_status_t);
static void process_readnetfd(void *);
static void process_writenetfd(void *, void *, ssize_t);
static struct active_service *service_new(security_handle_t *,
const char *, service_t, const char *);
static void service_delete(struct active_service *);
static int writebuf(struct active_service *, const void *, size_t);
static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
static char *amandad_get_security_conf (char *, void *);
static const char *state2str(state_t);
static const char *action2str(action_t);
static gpointer shm_ring_thread(gpointer dh);
int
main(
int argc,
char ** argv)
{
int i;
guint j;
int have_services;
int in, out;
const security_driver_t *secdrv;
int no_exit = 0;
char *pgm = "amandad"; /* in case argv[0] is not set */
#if defined(USE_REUSEADDR)
const int on = 1;
int r;
#endif
glib_init();
/*
* Configure program for internationalization:
* 1) Only set the message locale for now.
* 2) Set textdomain for all amanda related programs to "amanda"
* We don't want to be forced to support dozens of message catalogs.
*/
setlocale(LC_MESSAGES, "C");
textdomain("amanda");
safe_fd(-1, 0);
safe_cd();
/*
* Nexenta needs the SUN_PERSONALITY env variable to be unset, otherwise
* the Sun version of tar in /usr/sun/sbin/tar is called instead.
*
* On other operating systems this will have no effect.
*/
#ifdef HAVE_UNSETENV
unsetenv("SUN_PERSONALITY");
#endif
/*
* When called via inetd, it is not uncommon to forget to put the
* argv[0] value on the config line. On some systems (e.g. Solaris)
* this causes argv and/or argv[0] to be NULL, so we have to be
* careful getting our name.
*/
if ((argv == NULL) || (argv[0] == NULL)) {
pgm = "amandad"; /* in case argv[0] is not set */
} else {
pgm = basename(argv[0]); /* Strip of leading path get debug name */
}
set_pname(pgm);
dbopen(DBG_SUBDIR_AMANDAD);
if(argv == NULL) {
error(_("argv == NULL\n"));
/*NOTREACHED*/
}
if (argc > 1 && argv && argv[1] && g_str_equal(argv[1], "--version")) {
printf("amandad-%s\n", VERSION);
return (0);
}
/* Don't die when child closes pipe */
signal(SIGPIPE, SIG_IGN);
/* Parse the configuration; we'll handle errors later */
config_init(CONFIG_INIT_CLIENT|CONFIG_INIT_GLOBAL, NULL);
if (geteuid() == 0) {
check_running_as(RUNNING_AS_ROOT);
initgroups(CLIENT_LOGIN, get_client_gid());
if(setgid(get_client_gid()) != 0) { error("Can't set gid"); };
if(setegid(get_client_gid()) != 0) { error("Can't set egid"); };
if(seteuid(get_client_uid()) != 0) { error("Can't set euid"); };
} else {
check_running_as(RUNNING_AS_CLIENT_LOGIN);
}
add_amanda_log_handler(amanda_log_stderr);
add_amanda_log_handler(amanda_log_syslog);
/*
* ad-hoc argument parsing
*
* We accept -auth=[authentication type]
* -no-exit
* -tcp=[port]
* -udp=[port]
* We also add a list of services that amandad can launch
*/
secdrv = NULL;
in = 0; out = 1; /* default to stdin/stdout */
have_services = 0;
for (i = 1; i < argc; i++) {
g_debug("argc[%d] = %s", i, argv[i]);
/*
* Get a driver for a security type specified after -auth=
*/
if (g_str_has_prefix(argv[i], "-auth=")) {
argv[i] += strlen("-auth=");
secdrv = security_getdriver(argv[i]);
auth = argv[i];
if (secdrv == NULL) {
error(_("no driver for security type '%s'\n"), argv[i]);
/*NOTREACHED*/
}
if (g_str_equal(auth, "local") ||
g_str_equal(auth, "rsh") ||
g_str_equal(auth, "ssh")) {
guint i;
for (i=0; i < NSERVICES; i++) {
services[i].active = 1;
}
}
continue;
}
/*
* If -no-exit is specified, always run even after requests have
* been satisfied.
*/
else if (g_str_equal(argv[i], "-no-exit")) {
no_exit = 1;
continue;
}
/*
* Allow us to directly bind to a udp port for debugging.
* This may only apply to some security types.
*/
else if (g_str_has_prefix(argv[i], "-udp=")) {
#ifdef WORKING_IPV6
struct sockaddr_in6 sin;
#else
struct sockaddr_in sin;
#endif
argv[i] += strlen("-udp=");
#ifdef WORKING_IPV6
in = out = socket(AF_INET6, SOCK_DGRAM, 0);
#else
in = out = socket(AF_INET, SOCK_DGRAM, 0);
#endif
if (in < 0) {
error(_("can't create dgram socket: %s\n"), strerror(errno));
/*NOTREACHED*/
}
#ifdef USE_REUSEADDR
r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
(void *)&on, (socklen_t_equiv)sizeof(on));
if (r < 0) {
dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
strerror(errno));
}
#endif
#ifdef WORKING_IPV6
sin.sin6_family = (sa_family_t)AF_INET6;
sin.sin6_addr = in6addr_any;
sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
#else
sin.sin_family = (sa_family_t)AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
#endif
if (bind(in, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
strerror(errno));
/*NOTREACHED*/
}
}
/*
* Ditto for tcp ports.
*/
else if (g_str_has_prefix(argv[i], "-tcp=")) {
#ifdef WORKING_IPV6
struct sockaddr_in6 sin;
#else
struct sockaddr_in sin;
#endif
int sock;
socklen_t_equiv n;
argv[i] += strlen("-tcp=");
#ifdef WORKING_IPV6
sock = socket(AF_INET6, SOCK_STREAM, 0);
#else
sock = socket(AF_INET, SOCK_STREAM, 0);
#endif
if (sock < 0) {
error(_("can't create tcp socket: %s\n"), strerror(errno));
/*NOTREACHED*/
}
#ifdef USE_REUSEADDR
r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(void *)&on, (socklen_t_equiv)sizeof(on));
if (r < 0) {
dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
strerror(errno));
}
#endif
#ifdef WORKING_IPV6
sin.sin6_family = (sa_family_t)AF_INET6;
sin.sin6_addr = in6addr_any;
sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
#else
sin.sin_family = (sa_family_t)AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
#endif
if (bind(sock, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
strerror(errno));
/*NOTREACHED*/
}
listen(sock, 10);
n = (socklen_t_equiv)sizeof(sin);
in = out = accept(sock, (struct sockaddr *)&sin, &n);
}
/*
* It must be a service name
*/
else {
/* clear all services */
if(!have_services) {
for (j = 0; j < NSERVICES; j++)
services[j].active = 0;
}
have_services = 1;
if(g_str_equal(argv[i], "amdump")) {
services[0].active = 1;
services[1].active = 1;
services[2].active = 1;
services[3].active = 1;
}
else {
for (j = 0; j < NSERVICES; j++)
if (g_str_equal(services[j].name, argv[i]))
break;
if (j == NSERVICES) {
global_error = g_strdup_printf("invalid '%s' service as argument to amandad", argv[i]);
g_debug("%s\n", global_error);
}
services[j].active = 1;
}
}
}
/*
* If no security type specified, use BSDTCP
*/
if (secdrv == NULL) {
secdrv = security_getdriver("BSDTCP");
auth = "bsdtcp";
if (secdrv == NULL) {
error(_("no driver for default security type 'BSDTCP'\n"));
/*NOTREACHED*/
}
}
if(strcasecmp(auth, "rsh") == 0 ||
strcasecmp(auth, "ssh") == 0 ||
strcasecmp(auth, "krb5") == 0 ||
strcasecmp(auth, "local") == 0 ||
strcasecmp(auth, "bsdtcp") == 0 ||
strcasecmp(auth, "ssl") == 0) {
exit_on_qlength = 1;
}
#ifndef SINGLE_USERID
if (getuid() == 0) {
if (strcasecmp(auth, "krb5") != 0) {
struct passwd *pwd;
/* lookup our local user name */
if ((pwd = getpwnam(CLIENT_LOGIN)) == NULL) {
error(_("getpwnam(%s) failed."), CLIENT_LOGIN);
}
if (pwd->pw_uid != 0) {
error(_("'amandad' must be run as user '%s' when using '%s' authentication"),
CLIENT_LOGIN, auth);
}
}
} else {
if (strcasecmp(auth, "krb5") == 0) {
error(_("'amandad' must be run as user 'root' when using 'krb5' authentication"));
}
}
#endif
/* initialize */
startclock();
dbprintf(_("version %s\n"), VERSION);
for (i = 0; version_info[i] != NULL; i++) {
dbprintf(" %s", version_info[i]);
}
if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
dbprintf(_("WARNING: argv[0] not defined: check inetd.conf\n"));
}
/* krb5 require the euid to be 0 */
if (strcasecmp(auth, "krb5") == 0) {
if(seteuid((uid_t)0) != 0) { error("Can't set euid to 0"); };
}
/*
* Schedule to call protocol_accept() when new security handles
* are created on stdin.
*/
security_accept(secdrv, amandad_get_security_conf, in, out, protocol_accept, NULL);
/*
* Schedule an event that will try to exit every 30 seconds if there
* are no requests outstanding.
*/
exit_event = event_create((event_id_t)30, EV_TIME, exit_check, &no_exit);
event_activate(exit_event);
/*
* Call event_loop() with an arg of 0, telling it to block until all
* events are completed.
*/
event_loop(0);
close(in);
close(out);
dbclose();
return(0);
}
/*
* This runs periodically and checks to see if we have any active services
* still running. If we don't, then we quit.
*/
static void
exit_check(
void * cookie)
{
int no_exit;
assert(cookie != NULL);
no_exit = *(int *)cookie;
/*
* If things are still running, then don't exit.
*/
if (g_slist_length(serviceq) > 0)
return;
/*
* If the caller asked us to never exit, then we're done
*/
if (no_exit)
return;
g_debug("timeout exit");
dbclose();
exit(0);
}
/*
* Handles new incoming protocol handles. This is a callback for
* security_accept(), which gets called when new handles are detected.
*/
static void
protocol_accept(
security_handle_t * handle,
pkt_t * pkt)
{
pkt_t pkt_out;
GSList *iter;
struct active_service *as;
char *pktbody, *tok, *service, *arguments;
char *service_path = NULL;
GSList *errlist = NULL;
guint i;
char *peer_name;
pkt_out.body = NULL;
/*
* If handle is NULL, then the connection is closed.
*/
if (handle == NULL) {
if (exit_on_qlength && exit_event) {
/* remove the timeout, we will exit once the service terminate */
event_release(exit_event);
exit_event = NULL;
}
return;
}
/* If we have global errors, let the remote system know immediately.
* Unfortunately, we only get one ERROR line, so if there
* are multiple errors, we just show the first.
*/
if (global_error) {
pkt_init(&pkt_out, P_NAK, "ERROR %s", global_error);
do_sendpkt(handle, &pkt_out);
amfree(pkt_out.body);
security_close(handle);
return;
}
/*
* If we have errors (not warnings) from the config file, let the remote system
* know immediately. Unfortunately, we only get one ERROR line, so if there
* are multiple errors, we just show the first.
*/
if (config_errors(&errlist) >= CFGERR_ERRORS) {
GSList *iter = errlist;
char *errmsg;
gboolean multiple_errors = FALSE;
if (iter) {
errmsg = (char *)iter->data;
if (iter->next)
multiple_errors = TRUE;
} else {
errmsg = "(no error message)";
}
pkt_init(&pkt_out, P_NAK, "ERROR %s%s", errmsg,
multiple_errors? _(" (additional errors not displayed)"):"");
do_sendpkt(handle, &pkt_out);
amfree(pkt_out.body);
security_close(handle);
return;
}
peer_name = security_get_authenticated_peer_name(handle);
g_debug("authenticated peer name is '%s'", peer_name);
amfree(peer_name);
/*
* If pkt is NULL, then there was a problem with the new connection.
*/
if (pkt == NULL) {
dbprintf(_("accept error: %s\n"), security_geterror(handle));
pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
do_sendpkt(handle, &pkt_out);
amfree(pkt_out.body);
security_close(handle);
return;
}
dbprintf(_("accept recv %s pkt:\n<<<<<\n%s>>>>>\n"),
pkt_type2str(pkt->type), pkt->body);
/*
* If this is not a REQ packet, just forget about it.
*/
if (pkt->type != P_REQ) {
dbprintf(_("received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n"),
pkt_type2str(pkt->type), pkt->body);
security_close(handle);
return;
}
pktbody = service = arguments = NULL;
as = NULL;
/*
* Parse out the service and arguments
*/
pktbody = g_strdup(pkt->body);
tok = strtok(pktbody, " ");
if (tok == NULL)
goto badreq;
if (!g_str_equal(tok, "SERVICE"))
goto badreq;
tok = strtok(NULL, " \n");
if (tok == NULL)
goto badreq;
service = g_strdup(tok);
/* we call everything else 'arguments' */
tok = strtok(NULL, "");
if (tok == NULL)
goto badreq;
arguments = g_strdup(tok);
/* see if it's one we allow */
for (i = 0; i < NSERVICES; i++)
if (services[i].active == 1 && g_str_equal(services[i].name, service))
break;
if (i == NSERVICES) {
dbprintf(_("%s: invalid service\n"), service);
pkt_init(&pkt_out, P_NAK, _("ERROR %s: invalid service, add '%s' as argument to amandad\n"), service, service);
goto send_pkt_out;
}
service_path = g_strjoin(NULL, amlibexecdir, "/", service, NULL);
if (access(service_path, X_OK) < 0) {
dbprintf(_("can't execute %s: %s\n"), service_path, strerror(errno));
pkt_init(&pkt_out, P_NAK,
_("ERROR execute access to '%s' denied\n"),
service_path);
goto send_pkt_out;
}
/* see if its already running */
for (iter = serviceq; iter != NULL; iter = g_slist_next(iter)) {
as = (struct active_service *)iter->data;
if (g_str_equal(as->cmd, service_path) &&
g_str_equal(as->arguments, arguments)) {
dbprintf(_("%s %s: already running, acking req\n"),
service, arguments);
pkt_init_empty(&pkt_out, P_ACK);
goto send_pkt_out_no_delete;
}
}
/*
* create a new service instance, and send the arguments down
* the request pipe.
*/
dbprintf(_("creating new service: %s\n%s\n"), service, arguments);
as = service_new(handle, service_path, services[i].service, arguments);
if (writebuf(as, arguments, strlen(arguments)) < 0) {
const char *errmsg = strerror(errno);
dbprintf(_("error sending arguments to %s: %s\n"), service, errmsg);
pkt_init(&pkt_out, P_NAK, _("ERROR error writing arguments to %s: %s\n"),
service, errmsg);
goto send_pkt_out;
}
aclose(as->reqfd);
amfree(pktbody);
amfree(service);
amfree(service_path);
amfree(arguments);
/*
* Move to the sendack state, and start up the state
* machine.
*/
as->state = s_sendack;
state_machine(as, A_START, NULL);
return;
badreq:
pkt_init(&pkt_out, P_NAK, _("ERROR invalid REQ\n"));
dbprintf(_("received invalid %s packet:\n<<<<<\n%s>>>>>\n\n"),
pkt_type2str(pkt->type), pkt->body);
send_pkt_out:
if(as)
service_delete(as);
send_pkt_out_no_delete:
amfree(pktbody);
amfree(service_path);
amfree(service);
amfree(arguments);
do_sendpkt(handle, &pkt_out);
security_close(handle);
amfree(pkt_out.body);
}
/*
* Handles incoming protocol packets. Routes responses to the proper
* running service.
*/
static void
state_machine(
struct active_service * as,
action_t action,
pkt_t * pkt)
{
action_t retaction;
state_t curstate;
pkt_t nak;
amandad_debug(1, _("state_machine: %p entering\n"), as);
for (;;) {
curstate = as->state;
amandad_debug(1, _("state_machine: %p curstate=%s action=%s\n"), as,
state2str(curstate), action2str(action));
retaction = (*curstate)(as, action, pkt);
amandad_debug(1, _("state_machine: %p curstate=%s returned %s (nextstate=%s)\n"),
as, state2str(curstate), action2str(retaction),
state2str(as->state));
switch (retaction) {
/*
* State has queued up and is now blocking on input.
*/
case A_PENDING:
amandad_debug(1, _("state_machine: %p leaving (A_PENDING)\n"), as);
return;
/*
* service has switched states. Loop.
*/
case A_CONTINUE:
break;
/*
* state has determined that the packet it received was bogus.
* Send a nak, and return.
*/
case A_SENDNAK:
dbprintf(_("received unexpected %s packet\n"),
pkt_type2str(pkt->type));
dbprintf(_("<<<<<\n%s----\n\n"), pkt->body);
pkt_init(&nak, P_NAK, _("ERROR unexpected packet type %s\n"),
pkt_type2str(pkt->type));
do_sendpkt(as->security_handle, &nak);
amfree(nak.body);
security_recvpkt(as->security_handle, protocol_recv, as, -1);
amandad_debug(1, _("state_machine: %p leaving (A_SENDNAK)\n"), as);
return;
/*
* Service is done. Remove it and finish.
*/
case A_FINISH:
amandad_debug(1, _("state_machine: %p leaving (A_FINISH)\n"), as);
service_delete(as);
return;
default:
assert(0);
break;
}
}
/*NOTREACHED*/
}
/*
* This state just sends an ack. After that, we move to the repwait
* state to wait for REP data to arrive from the subprocess.
*/
static action_t
s_sendack(
struct active_service * as,
action_t action,
pkt_t * pkt)
{
pkt_t ack;
(void)action; /* Quiet unused parameter warning */
(void)pkt; /* Quiet unused parameter warning */
pkt_init_empty(&ack, P_ACK);
if (do_sendpkt(as->security_handle, &ack) < 0) {
dbprintf(_("error sending ACK: %s\n"),
security_geterror(as->security_handle));
amfree(ack.body);
return (A_FINISH);
}
amfree(ack.body);
/*
* move to the repwait state
* Setup a listener for data on the reply fd, but also
* listen for packets over the wire, as the server may
* poll us if we take a long time.
* Setup a timeout that will fire if it takes too long to
* receive rep data.
*/
as->state = s_repwait;
as->ev_repfd = event_create((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
as->ev_reptimeout = event_create(REP_TIMEOUT, EV_TIME,
timeout_repfd, as);
as->errbuf = NULL;
as->ev_errfd = event_create((event_id_t)as->errfd, EV_READFD, errfd_recv, as);
event_activate(as->ev_repfd);
event_activate(as->ev_reptimeout);
event_activate(as->ev_errfd);
security_recvpkt(as->security_handle, protocol_recv, as, -1);
return (A_PENDING);
}
/**
* Ensure enough space in a reply buffer, account for the final '\0'. The base
* size of a buffer will be NETWORK_BLOCK_BYTES. If not enough place is
* available, we double the buffer size, always. This function RELIES on the
* 'repbufsize' and 'bufsize' members being correct. For added safety, we
* require that if the buffer is declared to be empty according to its declared
* size, its offset is 0 and its data pointer is NULL.
*
* @param as: the active_service
* @param size: the size we need to write (without the final '\0')
* @returns: TRUE if the buffer had to be expanded, FALSE otherwise
*/
static gboolean expand_reply_buffer(struct active_service *as, gsize size)
{
gsize newsize = as->bufsize;
gsize requested = as->repbufsize + size + 1;
char *newbuf;
if (newsize >= requested)
return FALSE;
if (!newsize) {
g_assert(as->repbufsize == 0);
g_assert(as->repbuf == NULL);
newsize = NETWORK_BLOCK_BYTES;
}
while (newsize < requested)
newsize *= 2;
newbuf = g_realloc(as->repbuf, newsize);
as->repbuf = newbuf;
as->bufsize = newsize;
return TRUE;
}
/*
* This is the repwait state. We have responded to the initial REQ with
* an ACK, and we are now waiting for the process we spawned to pass us
* data to send in a REP.
*/
static action_t
s_repwait(
struct active_service * as,
action_t action,
pkt_t * pkt)
{
ssize_t n;
char *what;
char *msg;
int code = 0;
int pid;
amwait_t retstat;
gboolean expanded;
/*
* We normally shouldn't receive any packets while waiting
* for our REP data, but in some cases we do.
*/
if (action == A_RECVPKT) {
assert(pkt != NULL);
/*
* Another req for something that's running. Just send an ACK
* and go back and wait for more data.
*/
if (pkt->type == P_REQ) {
dbprintf(_("received dup P_REQ packet, ACKing it\n"));
amfree(as->rep_pkt.body);
pkt_init_empty(&as->rep_pkt, P_ACK);
do_sendpkt(as->security_handle, &as->rep_pkt);
security_recvpkt(as->security_handle, protocol_recv, as, -1);
return (A_PENDING);
}
/* something unexpected. Nak it */
return (A_SENDNAK);
}
if (action == A_TIMEOUT) {
amfree(as->rep_pkt.body);
pkt_init(&as->rep_pkt, P_NAK, _("ERROR timeout on reply pipe\n"));
dbprintf(_("%s timed out waiting for REP data\n"), as->cmd);
do_sendpkt(as->security_handle, &as->rep_pkt);
return (A_FINISH);
}
assert(action == A_RECVREP);
/*
* Ensure we have at least NETWORK_BLOCK_BYTES available in the reply buffer
*/
expanded = expand_reply_buffer(as, NETWORK_BLOCK_BYTES);
do {
n = read(as->repfd, as->repbuf + as->repbufsize,
as->bufsize - as->repbufsize - 1);
} while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
if (n < 0) {
const char *errstr = strerror(errno);
dbprintf(_("read error on reply pipe: %s\n"), errstr);
amfree(as->rep_pkt.body);
pkt_init(&as->rep_pkt, P_NAK, _("ERROR read error on reply pipe: %s\n"),
errstr);
do_sendpkt(as->security_handle, &as->rep_pkt);
return (A_FINISH);
}
/*
* At this point, we know that n >= 0. The two operations below are
* therefore pretty safe even if n is 0, in which case the current reply
* buffer won't be changed at all.
*/
as->repbufsize += n;
as->repbuf[as->repbufsize] = '\0';
/* If end of service, wait for process status */
if (n == 0) {
pid = waitpid(as->pid, &retstat, WNOHANG);
if (as->service == SERVICE_NOOP ||
as->service == SERVICE_SENDSIZE ||
as->service == SERVICE_SELFCHECK ||
as->service == SERVICE_SENDDISCOVER) {
long long delay = 100000;
struct timespec tdelay;
int t = 0;
while (t < 15 && pid == 0) {
tdelay.tv_sec = delay/1000000000;
tdelay.tv_nsec = delay%1000000000;
nanosleep(&tdelay, NULL);
delay *= 2;
t++;
pid = waitpid(as->pid, &retstat, WNOHANG);
}
}
process_errfd(as);
if (pid == 0)
pid = waitpid(as->pid, &retstat, WNOHANG);
if (pid > 0) {
what = NULL;
if (! WIFEXITED(retstat)) {
what = _("signal");
code = WTERMSIG(retstat);
} else if (WEXITSTATUS(retstat) != 0) {
what = _("code");
code = WEXITSTATUS(retstat);
}
if (what) {
dbprintf(_("service %s failed: pid %u exited with %s %d\n"),
(as->cmd)?as->cmd:_("??UNKONWN??"),
(unsigned)as->pid,
what, code);
msg = g_strdup_printf(
_("ERROR service %s failed: pid %u exited with %s %d\n"),
(as->cmd)?as->cmd:_("??UNKONWN??"), (unsigned)as->pid,
what, code);
expand_reply_buffer(as, strlen(msg));
strcpy(as->repbuf + as->repbufsize, msg);
as->repbufsize += strlen(msg);
amfree(msg);
}
}
}
/*
* If we got some data, go back and wait for more, or EOF.
*/
if (n > 0) {
if (!expanded && as->send_partial_reply &&
as->last_prep_time < time(NULL)-1) {
amfree(as->rep_pkt.body);
pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
do_sendpkt(as->security_handle, &as->rep_pkt);
as->last_prep_time = time(NULL);
amfree(as->rep_pkt.body);
pkt_init_empty(&as->rep_pkt, P_REP);
}
return (A_PENDING);
}
/*
* If we got 0, then we hit EOF. Process the data and release
* the timeout.
*/
assert(n == 0);
assert(as->ev_repfd != NULL);
event_release(as->ev_repfd);
as->ev_repfd = NULL;
assert(as->ev_reptimeout != NULL);
event_release(as->ev_reptimeout);
as->ev_reptimeout = NULL;
as->state = s_processrep;
aclose(as->repfd);
return (A_CONTINUE);
}
/*
* After we have read in all of the rep data, we process it and send
* it out as a REP packet.
*/
static action_t
s_processrep(
struct active_service * as,
action_t action,
pkt_t * pkt)
{
char *tok, *repbuf;
(void)action; /* Quiet unused parameter warning */
(void)pkt; /* Quiet unused parameter warning */
/*
* Copy the rep lines into the outgoing packet.
*
* If this line is a CONNECT, translate it
* Format is "CONNECT <tag> <handle> <tag> <handle> etc...
* Example:
*
* CONNECT DATA 4 MESG 5 INDEX 6
*
* The tags are arbitrary. The handles are in the DATA_FD pool.
* We need to map these to security streams and pass them back
* to the amanda server. If the handle is -1, then we don't map.
*/
if (strncmp_const(as->repbuf,"KENCRYPT\n") == 0) {
amandad_kencrypt = KENCRYPT_WILL_DO;
repbuf = g_strdup(as->repbuf + 9);
} else {
repbuf = g_strdup(as->repbuf);
}
amfree(as->rep_pkt.body);
pkt_init_empty(&as->rep_pkt, P_REP);
tok = strtok(repbuf, " ");
if (tok == NULL)
goto error;
if (g_str_equal(tok, "CONNECT")) {
char *line, *nextbuf;
/* Save the entire line */
line = strtok(NULL, "\n");
/* Save the buf following the line */
nextbuf = strtok(NULL, "");
if (line == NULL || nextbuf == NULL)
goto error;
pkt_cat(&as->rep_pkt, "CONNECT");
/* loop over the id/handle pairs */
for (;;) {
/* id */
tok = strtok(line, " ");
line = NULL; /* keep working from line */
if (tok == NULL)
break;
pkt_cat(&as->rep_pkt, " %s", tok);
/* handle */
tok = strtok(NULL, " \n");
if (tok == NULL)
goto error;
/* convert the handle into something the server can process */
pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
}
pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
} else {
error:
pkt_cat(&as->rep_pkt, "%s", as->repbuf);
}
/*
* We've setup our REP packet in as->rep_pkt. Now move to the transmission
* state.
*/
as->state = s_sendrep;
as->repretry = getconf_int(CNF_REP_TRIES);
amfree(repbuf);
return (A_CONTINUE);
}
/*
* This is the state where we send the REP we just collected from our child.
*/
static action_t
s_sendrep(
struct active_service * as,
action_t action,
pkt_t * pkt)
{
(void)action; /* Quiet unused parameter warning */
(void)pkt; /* Quiet unused parameter warning */
/*
* Transmit it and move to the ack state.
*/
do_sendpkt(as->security_handle, &as->rep_pkt);
security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
as->state = s_ackwait;
return (A_PENDING);
}
/*
* This is the state in which we wait for the server to ACK the REP
* we just sent it.
*/
static action_t
s_ackwait(
struct active_service * as,
action_t action,
pkt_t * pkt)
{
struct datafd_handle *dh;
int npipes;
/*
* If we got a timeout, try again, but eventually give up.
*/
if (action == A_TIMEOUT) {
if (--as->repretry > 0) {
as->state = s_sendrep;
return (A_CONTINUE);
}
dbprintf(_("timeout waiting for ACK for our REP\n"));
return (A_FINISH);
}
amandad_debug(1, _("received ACK, now opening streams\n"));
assert(action == A_RECVPKT);
if (pkt->type == P_REQ) {
dbprintf(_("received dup P_REQ packet, resending REP\n"));
as->state = s_sendrep;
return (A_CONTINUE);
}
if (pkt->type != P_ACK)
return (A_SENDNAK);
if (amandad_kencrypt == KENCRYPT_WILL_DO) {
amandad_kencrypt = KENCRYPT_YES;
}
/*
* Got the ack, now open the pipes
*/
for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
if (dh->netfd == NULL)
continue;
dbprintf("opening security stream for fd %d\n", (int)(dh - as->data) + DATA_FD_OFFSET);
if (security_stream_accept(dh->netfd) < 0) {
dbprintf(_("stream %td accept failed: %s\n"),
dh - &as->data[0], security_geterror(as->security_handle));
security_stream_close(dh->netfd);
dh->netfd = NULL;
continue;
}
/* setup an event for reads from it. As a special case, don't start
* listening on as->data[0] until we read some data on another fd, if
* the service is sendbackup. This ensures that we send a MESG or
* INDEX token before any DATA tokens, as dumper assumes. This is a
* hack, if that wasn't already obvious! */
if (dh != &as->data[0] || as->service != SERVICE_SENDBACKUP) {
dh->ev_read = event_create((event_id_t)dh->fd_read, EV_READFD,
process_readnetfd, dh);
event_activate(dh->ev_read);
} else {
amandad_debug(1, "Skipping registration of sendbackup's data FD\n");
}
security_stream_read(dh->netfd, process_writenetfd, dh);
}
/*
* Pipes are open, so auth them. Count them at the same time.
*/
for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
if (dh->netfd == NULL)
continue;
if (security_stream_auth(dh->netfd) < 0) {
security_stream_close(dh->netfd);
dh->netfd = NULL;
event_release(dh->ev_read);
event_release(dh->ev_write);
dh->ev_read = NULL;
dh->ev_write = NULL;
} else {
npipes++;
}
}
/*
* If no pipes are open, then we're done. Otherwise, just start running.
* The event handlers on all of the pipes will take it from here.
*/
amandad_debug(1, _("at end of s_ackwait, npipes is %d\n"), npipes);
if (npipes == 0)
return (A_FINISH);
else {
security_close(as->security_handle);
as->security_handle = NULL;
return (A_PENDING);
}
}
/*
* Called when a repfd has received data
*/
static void
repfd_recv(
void * cookie)
{
struct active_service *as = cookie;
assert(as != NULL);
assert(as->ev_repfd != NULL);
state_machine(as, A_RECVREP, NULL);
}
static void
process_errfd(
void *cookie)
{
struct active_service *as = cookie;
/* Process errfd before sending the REP packet */
if (as->ev_errfd) {
SELECT_ARG_TYPE readset;
struct timeval tv;
int nfound;
memset(&tv, 0, sizeof(tv));
FD_ZERO(&readset);
FD_SET(as->errfd, &readset);
nfound = select(as->errfd+1, &readset, NULL, NULL, &tv);
if (nfound && FD_ISSET(as->errfd, &readset)) {
errfd_recv(as);
}
}
}
/*
* Called when a errfd has received data
*/
static void
errfd_recv(
void * cookie)
{
struct active_service *as = cookie;
char buf[NETWORK_BLOCK_BYTES + 1];
int n;
gsize buflen = 0;
char *r;
assert(as != NULL);
assert(as->ev_errfd != NULL);
n = read(as->errfd, buf, NETWORK_BLOCK_BYTES);
/*
* Append whatever was read from the error file descriptor into the error
* buffer - or the read error message if there is one.
*
* In case when no data is read, or a read error, we release the event.
*/
switch (n) {
case -1:
g_snprintf(buf, NETWORK_BLOCK_BYTES + 1,
"error reading stderr or service: %s\n", strerror(errno));
/* Fall through */
case 0:
event_release(as->ev_errfd);
as->ev_errfd = NULL;
break;
default:
buf[n] = '\0';
buflen = n;
}
if (buflen) {
GString *strbuf = g_string_new(as->errbuf);
g_string_append(strbuf, buf);
g_free(as->errbuf);
as->errbuf = g_string_free(strbuf, FALSE);
}
/* for each line terminate by '\n' */
while (as->errbuf != NULL && (r = strchr(as->errbuf, '\n')) != NULL) {
char *s;
*r = '\0';
s = g_strdup_printf("ERROR service %s: %s\n",
services[as->service].name, as->errbuf);
/* Add to repbuf, error message will be in the REP packet if it
* is not already sent
*/
n = strlen(s);
expand_reply_buffer(as, n);
memcpy(as->repbuf + as->repbufsize, s, n);
as->repbufsize += n;
dbprintf("%s", s);
amfree(s);
/* remove first line from buffer */
r++;
s = g_strdup(r);
amfree(as->errbuf);
as->errbuf = s;
}
}
/*
* Called when a repfd has timed out
*/
static void
timeout_repfd(
void * cookie)
{
struct active_service *as = cookie;
assert(as != NULL);
assert(as->ev_reptimeout != NULL);
state_machine(as, A_TIMEOUT, NULL);
}
/*
* Called when a handle has received data
*/
static void
protocol_recv(
void * cookie,
pkt_t * pkt,
security_status_t status)
{
struct active_service *as = cookie;
assert(as != NULL);
switch (status) {
case S_OK:
dbprintf(_("received %s pkt:\n<<<<<\n%s>>>>>\n"),
pkt_type2str(pkt->type), pkt->body);
state_machine(as, A_RECVPKT, pkt);
break;
case S_TIMEOUT:
dbprintf(_("timeout\n"));
state_machine(as, A_TIMEOUT, NULL);
break;
case S_ERROR:
dbprintf(_("receive error: %s\n"),
security_geterror(as->security_handle));
break;
}
}
/*
* This is a generic relay function that just reads data from one of
* the process's pipes and passes it up the equivalent security_stream_t
*/
static void
process_readnetfd(
void * cookie)
{
pkt_t nak;
struct datafd_handle *dh = cookie;
struct active_service *as = dh->as;
ssize_t n;
gboolean start_sendbackup_data = FALSE;
nak.body = NULL;
do {
n = read(dh->fd_read, as->databuf, sizeof(as->databuf));
} while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
/*
* Process has died.
*/
if (n < 0) {
pkt_init(&nak, P_NAK, _("A ERROR data descriptor %d broken: %s\n"),
dh->fd_read, strerror(errno));
goto sendnak;
}
/*
* Process has closed the pipe. Just remove this event handler.
* If all pipes are closed, shut down this service.
*/
if (n == 0) {
event_release(dh->ev_read);
dh->ev_read = NULL;
if (as->thread && dh->shm_ring) {
g_thread_join(as->thread);
close_consumer_shm_ring(dh->shm_ring);
dh->shm_ring = NULL;
as->thread = NULL;
}
security_stream_close(dh->netfd);
dh->netfd = NULL;
for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
if (dh->netfd != NULL &&
(as->service != SERVICE_SENDBACKUP ||
as->seen_info_end ||
dh != &as->data[1])) {
return;
}
}
service_delete(as);
return;
}
/* Handle the special case of recognizing "sendbackup info end"
* from sendbackup's MESG fd */
if (as->service == SERVICE_SENDBACKUP && !as->seen_info_end && dh == &as->data[1]) {
/* make a buffer containing the combined data from info_end_buf
* and what we've read this time, and search it for info_end_strj
* This includes a NULL byte for strstr's sanity. */
char *combined_buf = malloc(INFO_END_LEN + n + 1);
memcpy(combined_buf, as->info_end_buf, INFO_END_LEN);
memcpy(combined_buf+INFO_END_LEN, as->databuf, n);
combined_buf[INFO_END_LEN+n] = '\0';
as->seen_info_end = (strstr(combined_buf, info_end_str) != NULL);
/* fill info_end_buf from the tail end of combined_buf */
memcpy(as->info_end_buf, combined_buf + n, INFO_END_LEN);
amfree(combined_buf);
/* if we did see info_end_str, start reading the data fd (fd 0) */
if (as->seen_info_end) {
start_sendbackup_data = TRUE;
} else {
amandad_debug(1, "sendbackup header info still not complete\n");
}
}
if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
/* stream has croaked */
event_release(dh->ev_read);
dh->ev_read = NULL;
close(dh->fd_read);
return;
}
if (start_sendbackup_data) {
struct datafd_handle *dh = &as->data[0];
amandad_debug(1, "Opening datafd to sendbackup (delayed until sendbackup sent header info)\n");
dh->ev_read = event_create((event_id_t)dh->fd_read, EV_READFD,
process_readnetfd, dh);
event_activate(dh->ev_read);
dh->shm_ring = as->shm_ring;
if (dh->shm_ring) {
as->thread = g_thread_create(shm_ring_thread, (gpointer)dh, TRUE, NULL);
}
}
return;
sendnak:
do_sendpkt(as->security_handle, &nak);
service_delete(as);
amfree(nak.body);
}
static gpointer
shm_ring_thread(
gpointer cookie)
{
struct datafd_handle *dh = cookie;
shm_ring_consumer_set_size(dh->shm_ring, NETWORK_BLOCK_BYTES*8, NETWORK_BLOCK_BYTES);
shm_ring_to_security_stream(dh->shm_ring, dh->netfd, NULL);
return NULL;
}
/*
* This is a generic relay function that just read data from one of
* the security_stream_t and passes it up the equivalent process's pipes
*/
static void
process_writenetfd(
void * cookie,
void * buf,
ssize_t size)
{
struct datafd_handle *dh;
struct datafd_handle *dh_end;
struct active_service *as;
assert(cookie != NULL);
dh = cookie;
as = dh->as;
if (dh->fd_write <= 0) {
dbprintf(_("process_writenetfd: dh->fd_write <= 0\n"));
} else if (size > 0) {
full_write(dh->fd_write, buf, (size_t)size);
} else {
g_debug("process_writenetfd %d: %zd", dh->fd_write, size);
aclose(dh->fd_write);
if (as->thread && dh->shm_ring) {
g_thread_join(as->thread);
close_consumer_shm_ring(dh->shm_ring);
dh->shm_ring = NULL;
as->thread = NULL;
}
if (dh->ev_read) {
event_release(dh->ev_read);
dh->ev_read = NULL;
aclose(dh->fd_read);
}
if (dh->netfd) {
security_stream_close(dh->netfd);
dh->netfd = NULL;
}
dh_end = &as->data[DATA_FD_COUNT];
for (dh = &as->data[0]; dh < dh_end; dh++) {
if (dh->netfd != NULL) {
return;
}
}
service_delete(as);
}
}
/*
* Convert a local stream handle (DATA_FD...) into something that
* can be sent to the amanda server.
*
* Returns a number that should be sent to the server in the REP packet.
*/
static int
allocstream(
struct active_service * as,
int handle)
{
struct datafd_handle *dh;
/* note that handle is in the range DATA_FD_OFFSET to DATA_FD_COUNT, but
* it is NOT a file descriptor! */
/* if the handle is -1, then we don't bother */
if (handle < 0)
return (-1);
/* make sure the handle's kosher */
if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
return (-1);
/* get a pointer into our handle array */
dh = &as->data[handle - DATA_FD_OFFSET];
/* make sure we're not already using the net handle */
if (dh->netfd != NULL)
return (-1);
/* allocate a stream from the security layer and return */
dh->netfd = security_stream_server(as->security_handle);
if (dh->netfd == NULL) {
dbprintf(_("couldn't open stream to server: %s\n"),
security_geterror(as->security_handle));
return (-1);
}
/*
* convert the stream into a numeric id that can be sent to the
* remote end.
*/
return (security_stream_id(dh->netfd));
}
/*
* Create a new service instance
*/
static struct active_service *
service_new(
security_handle_t * security_handle,
const char * cmd,
service_t service,
const char * arguments)
{
int i;
int data_read[DATA_FD_COUNT + 2][2];
int data_write[DATA_FD_COUNT + 2][2];
struct active_service *as;
pid_t pid;
int newfd;
char *peer_name;
char *amanda_remote_host_env[2];
char **env;
char **service_argv;
assert(security_handle != NULL);
assert(cmd != NULL);
assert(arguments != NULL);
/* a plethora of pipes */
/* data_read[0] : stdin
* data_write[0] : stdout
* data_read[1], data_write[1] : first stream
* data_read[2], data_write[2] : second stream
* data_read[3], data_write[3] : third stream
* data_write[4] : stderr
*/
for (i = 0; i < DATA_FD_COUNT + 1; i++) {
if (pipe(data_read[i]) < 0) {
error(_("pipe: %s\n"), strerror(errno));
/*NOTREACHED*/
}
if (pipe(data_write[i]) < 0) {
error(_("pipe: %s\n"), strerror(errno));
/*NOTREACHED*/
}
}
if (pipe(data_write[STDERR_PIPE]) < 0) {
error(_("pipe: %s\n"), strerror(errno));
/*NOTREACHED*/
}
as = g_new0(struct active_service, 1);
as->cmd = g_strdup(cmd);
as->arguments = g_strdup(arguments);
as->send_partial_reply = 0;
if (service == SERVICE_SENDSIZE) {
g_option_t *g_options;
char *option_str, *p;
option_str = g_strdup(as->arguments+8);
p = strchr(option_str,'\n');
if (p) *p = '\0';
g_options = parse_g_options(option_str, 1);
if (am_has_feature(g_options->features, fe_partial_estimate)) {
as->send_partial_reply = 1;
}
free_g_options(g_options);
amfree(option_str);
} else if (service == SERVICE_SENDBACKUP) {
g_option_t *g_options;
char *option_str, *p;
option_str = g_strdup(as->arguments+8);
p = strchr(option_str,'\n');
if (p) *p = '\0';
g_options = parse_g_options(option_str, 1);
as->data_shm_control_name = g_strdup(g_options->data_shm_control_name);
if (!as->data_shm_control_name) {
char *errmsg = NULL;
as->shm_ring = shm_ring_create(&errmsg);
if (!as->shm_ring) {
g_free(errmsg);
}
}
free_g_options(g_options);
amfree(option_str);
}
switch(pid = fork()) {
case -1:
error(_("could not fork service %s: %s\n"), cmd, strerror(errno));
/*NOTREACHED*/
default:
/*
* The parent. Close the far ends of our pipes and return.
*/
as->security_handle = security_handle;
as->state = NULL;
as->service = service;
as->pid = pid;
as->seen_info_end = FALSE;
/* fill in info_end_buf with non-null characters */
memset(as->info_end_buf, '-', sizeof(as->info_end_buf));
/* write to the request pipe */
aclose(data_read[0][0]);
as->reqfd = data_read[0][1];
/*
* read from the reply pipe
*/
as->repfd = data_write[0][0];
aclose(data_write[0][1]);
as->ev_repfd = NULL;
as->repbuf = NULL;
as->repbufsize = 0;
as->bufsize = 0;
as->repretry = 0;
as->rep_pkt.body = NULL;
/*
* read from the stderr pipe
*/
as->errfd = data_write[STDERR_PIPE][0];
aclose(data_write[STDERR_PIPE][1]);
as->ev_errfd = NULL;
as->errbuf = NULL;
/*
* read from the rest of the general-use pipes
* (netfds are opened as the client requests them)
*/
for (i = 0; i < DATA_FD_COUNT; i++) {
aclose(data_read[i + 1][1]);
aclose(data_write[i + 1][0]);
as->data[i].fd_read = data_read[i + 1][0];
as->data[i].fd_write = data_write[i + 1][1];
as->data[i].ev_read = NULL;
as->data[i].ev_write = NULL;
as->data[i].netfd = NULL;
as->data[i].as = as;
}
/* add it to the service queue */
/* increment the active service count */
serviceq = g_slist_append(serviceq, (gpointer)as);
return (as);
case 0:
/*
* The child. Put our pipes in their advertised locations
* and start up.
*/
/* set up the AMANDA_AUTHENTICATED_PEER env var so child services
* can use it to authenticate */
peer_name = security_get_authenticated_peer_name(security_handle);
amanda_remote_host_env[0] = NULL;
amanda_remote_host_env[1] = NULL;
if (*peer_name) {
amanda_remote_host_env[0] =
g_strdup_printf("AMANDA_AUTHENTICATED_PEER=%s", peer_name);
}
/*
* The data stream is stdin in the new process
*/
if (dup2(data_read[0][0], 0) < 0) {
error(_("dup %d to %d failed: %s\n"), data_read[0][0], 0,
strerror(errno));
/*NOTREACHED*/
}
aclose(data_read[0][0]);
aclose(data_read[0][1]);
/*
* The reply stream is stdout
*/
if (dup2(data_write[0][1], 1) < 0) {
error(_("dup %d to %d failed: %s\n"), data_write[0][1], 1,
strerror(errno));
}
aclose(data_write[0][0]);
aclose(data_write[0][1]);
for (i = 0; i < DATA_FD_COUNT; i++) {
aclose(data_read[i + 1][0]);
aclose(data_write[i + 1][1]);
}
/*
* Make sure they are not open in the range DATA_FD_OFFSET to
* DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
*/
for (i = 0; i < DATA_FD_COUNT; i++) {
while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
newfd = dup(data_read[i + 1][1]);
if(newfd == -1)
error(_("Can't dup out off DATA_FD range"));
data_read[i + 1][1] = newfd;
}
while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
newfd = dup(data_write[i + 1][0]);
if(newfd == -1)
error(_("Can't dup out off DATA_FD range"));
data_write[i + 1][0] = newfd;
}
}
while(data_write[4][0] >= DATA_FD_OFFSET &&
data_write[4][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
newfd = dup(data_write[4][0]);
if (newfd == -1)
error(_("Can't dup out off DATA_FD range"));
data_write[4][0] = newfd;
}
while(data_write[4][1] >= DATA_FD_OFFSET &&
data_write[4][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
newfd = dup(data_write[4][1]);
if (newfd == -1)
error(_("Can't dup out off DATA_FD range"));
data_write[4][1] = newfd;
}
for (i = 0; i < DATA_FD_COUNT*2; i++)
close(DATA_FD_OFFSET + i);
/*
* The rest start at the offset defined in amandad.h, and continue
* through the internal defined.
*/
for (i = 0; i < DATA_FD_COUNT; i++) {
if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
error(_("dup %d to %d failed: %s\n"), data_read[i + 1][1],
i + DATA_FD_OFFSET, strerror(errno));
}
aclose(data_read[i + 1][1]);
if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
error(_("dup %d to %d failed: %s\n"), data_write[i + 1][0],
i + DATA_FD_OFFSET, strerror(errno));
}
aclose(data_write[i + 1][0]);
}
service_argv = g_new0(char *, 6);
service_argv[0] = g_strdup(cmd);
service_argv[1] = g_strdup("amandad");
service_argv[2] = g_strdup(auth);
if (as->data_shm_control_name) {
service_argv[3] = g_strdup("--shm-name");
service_argv[4] = g_strdup(as->data_shm_control_name);
service_argv[5] = (char *)NULL;
} else if (as->shm_ring && as->shm_ring->shm_control_name) {
service_argv[3] = g_strdup("--shm-name");
service_argv[4] = g_strdup(as->shm_ring->shm_control_name);
service_argv[5] = (char *)NULL;
} else {
service_argv[3] = (char *)NULL;
}
g_debug("service_argv[0] = %s\n", service_argv[0]);
g_debug("service_argv[1] = %s\n", service_argv[1]);
g_debug("service_argv[2] = %s\n", service_argv[2]);
g_debug("service_argv[3] = %s\n", service_argv[3]);
g_debug("service_argv[4] = %s\n", service_argv[4]);
g_debug("service_argv[5] = %s\n", service_argv[5]);
/* close all unneeded fd */
close(STDERR_FILENO);
dup2(data_write[STDERR_PIPE][1], 2);
aclose(data_write[STDERR_PIPE][0]);
aclose(data_write[STDERR_PIPE][1]);
safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
env = safe_env_full(amanda_remote_host_env);
execve(cmd, service_argv, env);
error(_("could not exec service %s: %s\n"), cmd, strerror(errno));
free_env(env);
/*NOTREACHED*/
}
return NULL;
}
/*
* Unallocate a service instance
*/
static void
service_delete(
struct active_service * as)
{
int i;
pid_t pid;
struct datafd_handle *dh;
amandad_debug(1, _("closing service: %p\n"), as);
amandad_debug(1, _("closing service: %s\n"),
(as->cmd)?as->cmd:_("??UNKONWN??"));
assert(as != NULL);
assert(as->cmd != NULL);
amfree(as->cmd);
assert(as->arguments != NULL);
amfree(as->arguments);
if (as->reqfd != -1)
aclose(as->reqfd);
if (as->repfd != -1)
aclose(as->repfd);
if (as->errfd != -1) {
process_errfd(as);
aclose(as->errfd);
}
if (as->ev_repfd != NULL)
event_release(as->ev_repfd);
if (as->ev_reptimeout != NULL)
event_release(as->ev_reptimeout);
if (as->ev_errfd != NULL)
event_release(as->ev_errfd);
for (i = 0; i < DATA_FD_COUNT; i++) {
dh = &as->data[i];
aclose(dh->fd_read);
aclose(dh->fd_write);
if (dh->netfd != NULL)
security_stream_close(dh->netfd);
if (dh->ev_read != NULL)
event_release(dh->ev_read);
if (dh->ev_write != NULL)
event_release(dh->ev_write);
}
if (as->security_handle != NULL)
security_close(as->security_handle);
/* wait for the process to terminate */
assert(as->pid > 0);
pid = waitpid(as->pid, NULL, WNOHANG);
if (pid == 0) {
long long delay = 100000;
struct timespec tdelay;
int t = 0;
pid = waitpid(as->pid, NULL, WNOHANG);
while (t < 15 && pid == 0) {
tdelay.tv_sec = delay/1000000000;
tdelay.tv_nsec = delay%1000000000;
nanosleep(&tdelay, NULL);
delay *= 2;
t++;
pid = waitpid(as->pid, NULL, WNOHANG);
}
if (pid == 0) {
g_debug("Process %d failed to exit", (int)as->pid);
}
}
/* try to kill the process; if this fails, then it's already dead and
* likely some of the other zombie cleanup ate its brains, so we don't
* bother to waitpid for it */
assert(as->pid > 0);
pid = waitpid(as->pid, NULL, WNOHANG);
if (pid == 0 && kill(as->pid, SIGTERM) == 0) {
long long delay = 100000;
struct timespec tdelay;
int t = 0;
pid = waitpid(as->pid, NULL, WNOHANG);
while (t < 15 && pid == 0) {
tdelay.tv_sec = delay/1000000000;
tdelay.tv_nsec = delay%1000000000;
nanosleep(&tdelay, NULL);
delay *= 2;
t++;
pid = waitpid(as->pid, NULL, WNOHANG);
}
if (pid == 0) {
g_debug("Process %d failed to exit", (int)as->pid);
}
} else {
g_debug("Waitpid for process %d failed: %s", (int)as->pid, strerror(errno));
}
serviceq = g_slist_remove(serviceq, (gpointer)as);
amfree(as->cmd);
amfree(as->arguments);
amfree(as->repbuf);
as->bufsize = as->repbufsize = 0;
amfree(as->rep_pkt.body);
// amfree(as); process_writenetfd can be calledi again, why?
if (exit_on_qlength == 0 && g_slist_length(serviceq) == 0) {
dbclose();
exit(0);
}
}
/*
* Like 'fullwrite', but does the work in a child process so pipelines
* do not hang.
*/
static int
writebuf(
struct active_service * as,
const void * bufp,
size_t size)
{
pid_t pid;
size_t writesize;
switch (pid=fork()) {
case -1:
break;
default:
waitpid(pid, NULL, WNOHANG);
return 0; /* this is the parent */
case 0: /* this is the child */
close(as->repfd);
writesize = full_write(as->reqfd, bufp, size);
exit(writesize != size);
/* NOTREACHED */
}
return -1;
}
static ssize_t
do_sendpkt(
security_handle_t * handle,
pkt_t * pkt)
{
dbprintf(_("sending %s pkt:\n<<<<<\n%s>>>>>\n"),
pkt_type2str(pkt->type), pkt->body);
if (handle)
return security_sendpkt(handle, pkt);
else
return 1;
}
/*
* Convert a state into a string
*/
static const char *
state2str(
state_t state)
{
static const struct {
state_t state;
const char str[13];
} states[] = {
#define X(state) { state, stringize(state) }
X(s_sendack),
X(s_repwait),
X(s_processrep),
X(s_sendrep),
X(s_ackwait),
#undef X
};
int i;
for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
if (state == states[i].state)
return (states[i].str);
return (_("INVALID STATE"));
}
/*
* Convert an action into a string
*/
static const char *
action2str(
action_t action)
{
static const struct {
action_t action;
const char str[12];
} actions[] = {
#define X(action) { action, stringize(action) }
X(A_START),
X(A_RECVPKT),
X(A_RECVREP),
X(A_PENDING),
X(A_FINISH),
X(A_CONTINUE),
X(A_SENDNAK),
X(A_TIMEOUT),
#undef X
};
int i;
for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
if (action == actions[i].action)
return (actions[i].str);
return (_("UNKNOWN ACTION"));
}
static char *
amandad_get_security_conf(
char *string,
void *arg G_GNUC_UNUSED)
{
char *result = NULL;
if (!string || !*string)
return(NULL);
if (g_str_equal(string, "kencrypt")) {
if (amandad_kencrypt == KENCRYPT_YES)
result = "yes";
} else {
result = generic_client_get_security_conf(string, arg);
}
if (result && strlen(result) == 0)
result = NULL;
return(result);
}