Blob Blame History Raw
/*
 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
 * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
 *
 * Contact information: Carbonite Inc., 756 N Pastoria Ave
 * Sunnyvale, CA 94085, or: http://www.zmanda.com
 */

#include "amanda.h"
#include "ipc-binary.h"

struct ipc_binary_proto_t {
    guint16 magic;
    guint16 n_cmds;
    ipc_binary_cmd_t *cmds;
};

/* extra flag to indicate that an argument exists */
#define IPC_BINARY_EXISTS (1 << 7)

struct ipc_binary_cmd_t {
    gboolean exists;
    guint8 *arg_flags;
    guint16 n_args;
};

#define MSG_HDR_LEN 10
#define ARG_HDR_LEN 6

/*
 * Utilities
 */

static void
expand_buffer(
    ipc_binary_buf_t *buf,
    gsize size)
{
    gsize new_len = buf->length + size;

    /* allocate space in the buffer if necessary */
    if (buf->offset + new_len > buf->size) {
	if (buf->offset != 0 && new_len <= buf->size) {
	    g_memmove(buf->buf,
		      buf->buf + buf->offset,
		      buf->length);
	    buf->offset = 0;
	} else {
	    buf->size = buf->offset + new_len;
	    buf->buf = g_realloc(buf->buf, buf->size);
	}
    }
}

static void
add_to_buffer(
    ipc_binary_buf_t *buf,
    gsize size,
    gpointer data)
{
    expand_buffer(buf, size);

    g_memmove(buf->buf + buf->offset + buf->length, data, size);
    buf->length += size;
}

static void
consume_from_buffer(
    ipc_binary_buf_t *buf,
    gsize size)
{
    g_assert(size <= buf->length);

    buf->length -= size;
    if (buf->length == 0)
	buf->offset = 0;
    else
	buf->offset += size;
}

static gboolean
all_args_present(
    ipc_binary_message_t *msg)
{
    int i;

    for (i = 0; i < msg->cmd->n_args; i++) {
	if (msg->args[i].data == NULL
		&& (msg->cmd->arg_flags[i] & IPC_BINARY_EXISTS)
		&& !(msg->cmd->arg_flags[i] & IPC_BINARY_OPTIONAL)) {
	    g_debug("ipc-binary message missing mandatory arg %d", i);
	    return FALSE;
	}
    }

    return TRUE;
}

/*
 * Creating a protocol
 */

ipc_binary_proto_t *
ipc_binary_proto_new(
    guint16 magic)
{
    ipc_binary_proto_t *prot = g_new(ipc_binary_proto_t, 1);

    prot->magic = magic;
    prot->n_cmds = 0;
    prot->cmds = NULL;

    return prot;
}

ipc_binary_cmd_t *
ipc_binary_proto_add_cmd(
    ipc_binary_proto_t *proto,
    guint16 id)
{
    g_assert(proto != NULL);
    g_assert(id != 0);

    if (id >= proto->n_cmds) {
	guint16 new_len = id+1;
	int i;

	proto->cmds = g_renew(ipc_binary_cmd_t, proto->cmds, new_len);
	for (i = proto->n_cmds; i < new_len; i++) {
	    proto->cmds[i].n_args = 0;
	    proto->cmds[i].exists = FALSE;
	    proto->cmds[i].arg_flags = NULL;
	}
	proto->n_cmds = new_len;
    }

    /* make sure this command hasn't been defined already */
    g_assert(!proto->cmds[id].exists);
    proto->cmds[id].exists = TRUE;

    return &proto->cmds[id];
}

void
ipc_binary_cmd_add_arg(
    ipc_binary_cmd_t *cmd,
    guint16 id,
    guint8 flags)
{
    g_assert(cmd != NULL);
    g_assert(id != 0);
    flags |= IPC_BINARY_EXISTS;

    if (id >= cmd->n_args) {
	guint16 new_len = id+1;
	int i;

	cmd->arg_flags = g_realloc(cmd->arg_flags, new_len);
	for (i = cmd->n_args; i < new_len; i++) {
	    cmd->arg_flags[i] = 0;
	}
	cmd->n_args = new_len;
    }

    /* make sure this arg hasn't been defined already */
    g_assert(cmd->arg_flags[id] == 0);

    cmd->arg_flags[id] = flags;
}

/*
 * Using a protocol
 */

ipc_binary_channel_t *
ipc_binary_new_channel(
    ipc_binary_proto_t *proto)
{
    ipc_binary_channel_t *chan;

    chan = g_new0(ipc_binary_channel_t, 1);
    chan->proto = proto;

    return chan;
}

void
ipc_binary_free_channel(
    ipc_binary_channel_t *chan)
{
    if (chan->in.buf)
	g_free(chan->in.buf);

    if (chan->out.buf)
	g_free(chan->out.buf);

    g_free(chan);
}

ipc_binary_message_t *
ipc_binary_new_message(
    ipc_binary_channel_t *chan,
    guint16 cmd_id)
{
    ipc_binary_message_t *msg = g_new0(ipc_binary_message_t, 1);
    ipc_binary_cmd_t *cmd;

    /* make sure this is a valid command */
    g_assert(chan != NULL);
    g_assert(cmd_id > 0 && cmd_id < chan->proto->n_cmds);
    g_assert(chan->proto->cmds[cmd_id].exists);
    cmd = &chan->proto->cmds[cmd_id];

    msg->chan = chan;
    msg->cmd = cmd;
    msg->cmd_id = cmd_id;
    msg->n_args = cmd->n_args;
    msg->args = g_malloc0(sizeof(*(msg->args)) * cmd->n_args);

    return msg;
}

void
ipc_binary_add_arg(
    ipc_binary_message_t *msg,
    guint16 arg_id,
    gsize size,
    gpointer data,
    gboolean take_memory)
{
    /* make sure this arg has not already been set for this message */
    g_assert(msg != NULL);
    g_assert(data != NULL);
    g_assert(arg_id > 0 && arg_id < msg->cmd->n_args);
    g_assert(msg->cmd->arg_flags[arg_id] & IPC_BINARY_EXISTS);
    g_assert(msg->args[arg_id].data == NULL);

    if (size == 0 && msg->cmd->arg_flags[arg_id] & IPC_BINARY_STRING) {
	size = strlen((gchar *)data);
    }

    if (!take_memory) {
	data = g_memdup(data, size);
    }

    msg->args[arg_id].len = size;
    msg->args[arg_id].data = data;
}

void
ipc_binary_free_message(
    ipc_binary_message_t *msg)
{
    int i;

    g_assert(msg != NULL);

    for (i = 0; i < msg->cmd->n_args; i++) {
	gpointer data = msg->args[i].data;
	if (data)
	    g_free(data);
    }

    g_free(msg->args);
    g_free(msg);
}

ipc_binary_message_t *
ipc_binary_read_message(
    ipc_binary_channel_t *chan,
    int fd)
{
    ipc_binary_message_t *msg;

    /* read data until we have a whole packet or until EOF */
    while (!(msg = ipc_binary_poll_message(chan))) {
	gssize bytes;

	if (errno)
	    return NULL;

	/* read directly into the buffer, instead of using ipc_binary_feed_data */
	expand_buffer(&chan->in, 32768);
	bytes = read(fd, chan->in.buf + chan->in.offset + chan->in.length, 32768);
	if (bytes < 0) {
	    /* error on read */
	    return NULL;
	} else if (!bytes) {
	    /* got EOF; if there are bytes left over, this is EIO */
	    if (chan->in.length) {
		g_warning("got EOF reading ipc-binary channel with %zd bytes un-processed",
			  chan->in.length);
		errno = EIO;
	    }

	    return NULL;
	} else {
	    /* add the data to the buffer */
	    chan->in.length += bytes;
	}
    }

    return msg;
}

int
ipc_binary_write_message(
    ipc_binary_channel_t *chan,
    int fd,
    ipc_binary_message_t *msg)
{
    gsize written;

    /* add the message to the queue */
    ipc_binary_queue_message(chan, msg);

    /* and write the outgoing buffer */
    written = full_write(fd, chan->out.buf + chan->out.offset, chan->out.length);
    consume_from_buffer(&chan->out, written);

    if (written < chan->out.length) {
	return -1;
    }

    return 0;
}

void
ipc_binary_feed_data(
    ipc_binary_channel_t *chan,
    gsize size,
    gpointer data)
{
    add_to_buffer(&chan->in, size, data);
}

void
ipc_binary_data_transmitted(
    ipc_binary_channel_t *chan,
    gsize size)
{
    consume_from_buffer(&chan->out, size);
}

static guint16
get_guint16(guint8 **p) {
    guint16 v = 0;
    v = *((*p)++);
    v = *((*p)++) | v << 8;
    return v;
}

static guint32
get_guint32(guint8 **p) {
    guint32 v = 0;
    v = *((*p)++);
    v = *((*p)++) | v << 8;
    v = *((*p)++) | v << 8;
    v = *((*p)++) | v << 8;
    return v;
}


ipc_binary_message_t *
ipc_binary_poll_message(
    ipc_binary_channel_t *chan)
{
    guint8 *p;
    ipc_binary_message_t *msg;
    guint16 magic;
    guint16 cmd_id;
    guint32 length;
    guint16 n_args;

    if (chan->in.length < MSG_HDR_LEN) {
	errno = 0;
	return NULL;
    }

    /* read out the pocket header, using shifts to avoid endian and alignment
     * problems, and checking each one as we proceed */
    p = (guint8 *)(chan->in.buf + chan->in.offset);

    magic = get_guint16(&p);

    if (magic != chan->proto->magic) {
	g_debug("ipc-binary got invalid magic 0x%04x", (int)magic);
	errno = EINVAL;
	return NULL;
    }

    cmd_id = get_guint16(&p);

    /* make sure this is a valid command */
    if (cmd_id <= 0 || cmd_id >= chan->proto->n_cmds
		            || !chan->proto->cmds[cmd_id].exists) {
	errno = EINVAL;
	return NULL;
    }

    length = get_guint32(&p);

    /* see if there's enough data in this buffer for a whole message */
    if (length > chan->in.length) {
	errno = 0;
	return NULL; /* whole packet isn't here yet */
    }

    n_args = get_guint16(&p);

    /* looks legit -- start building a message */
    msg = ipc_binary_new_message(chan, cmd_id);

    /* get each of the arguments */
    while (n_args--) {
	guint16 arg_id;
	guint32 arglen;

	arglen = get_guint32(&p);
	arg_id = get_guint16(&p);

	if (arg_id <= 0 || arg_id >= msg->cmd->n_args
		|| !(msg->cmd->arg_flags[arg_id] & IPC_BINARY_EXISTS)
		|| msg->args[arg_id].data != NULL) {
	    g_debug("ipc-binary invalid or duplicate arg");
	    errno = EINVAL;
	    ipc_binary_free_message(msg);
	    return NULL;
	}

	/* properly terminate string args, but do not include the nul byte in
	 * the arglen */
	if (msg->cmd->arg_flags[arg_id] & IPC_BINARY_STRING) {
	    gchar *data;

	    /* copy and terminate the string */
	    data = g_malloc(arglen+1);
	    g_memmove(data, p, arglen);
	    data[arglen] = '\0';
	    msg->args[arg_id].data = (gpointer)data;
	    msg->args[arg_id].len = arglen;
	} else {
	    msg->args[arg_id].data = g_memdup(p, arglen);
	    msg->args[arg_id].len = arglen;
	}

	p += arglen;
    }

    /* check that all mandatory args are here */
    if (!all_args_present(msg)) {
	errno = EINVAL;
	ipc_binary_free_message(msg);
	return NULL;
    }

    consume_from_buffer(&chan->in, length);

    return msg;
}

static guint8 *
put_guint16(guint8 *p, guint16 v)
{
    *(p++) = v >> 8;
    *(p++) = v;
    return p;
}

static guint8 *
put_guint32(guint8 *p, guint32 v)
{
    *(p++) = v >> 24;
    *(p++) = v >> 16;
    *(p++) = v >> 8;
    *(p++) = v;
    return p;
}

void
ipc_binary_queue_message(
    ipc_binary_channel_t *chan G_GNUC_UNUSED,
    ipc_binary_message_t *msg G_GNUC_UNUSED)
{
    gsize msg_len;
    guint8 *p;
    int i;
    guint16 n_args = 0;

    g_assert(all_args_present(msg));

    /* calculate the length and make enough room in the buffer */
    msg_len = MSG_HDR_LEN;
    for (i = 0; i < msg->cmd->n_args; i++) {
	if (msg->args[i].data) {
	    n_args++;
	    msg_len += msg->args[i].len + ARG_HDR_LEN;
	}
    }
    expand_buffer(&chan->out, msg_len);
    p = (guint8 *)(chan->out.buf + chan->out.offset);

    /* write the packet */
    p = put_guint16(p, chan->proto->magic);
    p = put_guint16(p, msg->cmd_id);
    p = put_guint32(p, msg_len);
    p = put_guint16(p, n_args);

    for (i = 0; i < msg->cmd->n_args; i++) {
	if (!msg->args[i].data)
	    continue;

	p = put_guint32(p, msg->args[i].len);
	p = put_guint16(p, i);

	g_memmove(p, msg->args[i].data, msg->args[i].len);
	p += msg->args[i].len;
    }
    chan->out.length += msg_len;

    ipc_binary_free_message(msg);
}