Blob Blame History Raw
// XDRStreamMarshaller.cc

// -*- mode: c++; c-basic-offset:4 -*-

// This file is part of libdap, A C++ implementation of the OPeNDAP Data
// Access Protocol.

// Copyright (c) 2002,2003,2016 OPeNDAP, Inc.
// Author: Patrick West <pwest@ucar.edu>
//         James Gallagher <jgallagher@opendap.org>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
//
// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.

// (c) COPYRIGHT URI/MIT 1994-1999
// Please read the full copyright statement in the file COPYRIGHT_URI.
//
// Authors:
//      pwest       Patrick West <pwest@ucar.edu>


#include "config.h"

#ifdef HAVE_PTHREAD_H
#include <pthread.h>
#endif

#include <cassert>

#include <iostream>
#include <sstream>
#include <iomanip>

// #define DODS_DEBUG

#include "XDRStreamMarshaller.h"
#ifdef USE_POSIX_THREADS
#include "MarshallerThread.h"
#endif
#include "Vector.h"
#include "XDRUtils.h"
#include "util.h"

#include "debug.h"

using namespace std;

// Build this code so it does not use pthreads to write some kinds of
// data (see the put_vector() and put_vector_part() methods) in a child thread.
// #undef USE_POSIX_THREADS

namespace libdap {

char *XDRStreamMarshaller::d_buf = 0;
static const int XDR_DAP_BUFF_SIZE=256;


/** Build an instance of XDRStreamMarshaller. Bind the C++ stream out to this
 * instance. If the checksum parameter is true, initialize a checksum buffer
 * and enable the use of the reset_checksum() and get_checksum() methods.
 *
 * @param out Write to this stream object.
 * @param checksum If true, compute checksums. False by default
 * @param write_data If true, write data values. True by default
 */
XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) :
    d_out(out), d_partial_put_byte_count(0), tm(0)
{
    if (!d_buf) d_buf = (char *) malloc(XDR_DAP_BUFF_SIZE);
    if (!d_buf) throw Error(internal_error, "Failed to allocate memory for data serialization.");

    xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);

#ifdef USE_POSIX_THREADS
    tm = new MarshallerThread;
#endif
}

XDRStreamMarshaller::~XDRStreamMarshaller()
{
    // Added this because when USE_POS... is not defined, 'tm' has no
    // type, which the compiler complains about.
#ifdef USE_POSIX_THREADS
    delete tm;
#endif
    xdr_destroy(&d_sink);
}

void XDRStreamMarshaller::put_byte(dods_byte val)
{
     if (!xdr_setpos(&d_sink, 0))
        throw Error("Network I/O Error. Could not send byte data - unable to set stream position.");

    if (!xdr_char(&d_sink, (char *) &val))
        throw Error(
            "Network I/O Error. Could not send byte data.");

    unsigned int bytes_written = xdr_getpos(&d_sink);
    if (!bytes_written)
        throw Error(
            "Network I/O Error. Could not send byte data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
    Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

    d_out.write(d_buf, bytes_written);
}

void XDRStreamMarshaller::put_int16(dods_int16 val)
{
    if (!xdr_setpos(&d_sink, 0))
        throw Error(
            "Network I/O Error. Could not send int 16 data - unable to set stream position.");

    if (!XDR_INT16(&d_sink, &val))
        throw Error(
            "Network I/O Error. Could not send int 16 data.");

    unsigned int bytes_written = xdr_getpos(&d_sink);
    if (!bytes_written)
        throw Error(
            "Network I/O Error. Could not send int 16 data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
    Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

    d_out.write(d_buf, bytes_written);
}

void XDRStreamMarshaller::put_int32(dods_int32 val)
{
    if (!xdr_setpos(&d_sink, 0))
        throw Error(
            "Network I/O Error. Could not send int 32 data - unable to set stream position.");

    if (!XDR_INT32(&d_sink, &val))
        throw Error(
            "Network I/O Error. Culd not read int 32 data.");

    unsigned int bytes_written = xdr_getpos(&d_sink);
    if (!bytes_written)
        throw Error(
            "Network I/O Error. Could not send int 32 data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
    Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

    d_out.write(d_buf, bytes_written);
}

void XDRStreamMarshaller::put_float32(dods_float32 val)
{
    if (!xdr_setpos(&d_sink, 0))
        throw Error(
            "Network I/O Error. Could not send float 32 data - unable to set stream position.");

    if (!xdr_float(&d_sink, &val))
        throw Error(
            "Network I/O Error. Could not send float 32 data.");

    unsigned int bytes_written = xdr_getpos(&d_sink);
    if (!bytes_written)
        throw Error(
            "Network I/O Error. Could not send float 32 data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
    Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

    d_out.write(d_buf, bytes_written);
}

void XDRStreamMarshaller::put_float64(dods_float64 val)
{
    if (!xdr_setpos(&d_sink, 0))
        throw Error(
            "Network I/O Error. Could not send float 64 data - unable to set stream position.");

    if (!xdr_double(&d_sink, &val))
        throw Error(
            "Network I/O Error. Could not send float 64 data.");

    unsigned int bytes_written = xdr_getpos(&d_sink);
    if (!bytes_written)
        throw Error(
            "Network I/O Error. Could not send float 64 data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
    Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

    d_out.write(d_buf, bytes_written);
}

void XDRStreamMarshaller::put_uint16(dods_uint16 val)
{
    if (!xdr_setpos(&d_sink, 0))
        throw Error(
            "Network I/O Error. Could not send uint 16 data - unable to set stream position.");

    if (!XDR_UINT16(&d_sink, &val))
        throw Error(
            "Network I/O Error. Could not send uint 16 data.");

    unsigned int bytes_written = xdr_getpos(&d_sink);
    if (!bytes_written)
        throw Error(
            "Network I/O Error. Could not send uint 16 data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
    Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

    d_out.write(d_buf, bytes_written);
}

void XDRStreamMarshaller::put_uint32(dods_uint32 val)
{
    if (!xdr_setpos(&d_sink, 0))
        throw Error(
            "Network I/O Error. Could not send uint 32 data - unable to set stream position.");

    if (!XDR_UINT32(&d_sink, &val))
        throw Error(
            "Network I/O Error. Could not send uint 32 data.");

    unsigned int bytes_written = xdr_getpos(&d_sink);
    if (!bytes_written)
        throw Error(
            "Network I/O Error. Could not send uint 32 data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
    Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

    d_out.write(d_buf, bytes_written);
}

void XDRStreamMarshaller::put_str(const string &val)
{
    int size = val.length() + 8;

    XDR str_sink;
    vector<char> str_buf(size);

    try {
        xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE);

        if (!xdr_setpos(&str_sink, 0))
            throw Error(
                "Network I/O Error. Could not send string data - unable to set stream position.");

        const char *out_tmp = val.c_str();
        if (!xdr_string(&str_sink, (char **) &out_tmp, size))
            throw Error(
                "Network I/O Error. Could not send string data.");

        unsigned int bytes_written = xdr_getpos(&str_sink);
        if (!bytes_written)
            throw Error(
                "Network I/O Error. Could not send string data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
        Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

        d_out.write(&str_buf[0], bytes_written);

        xdr_destroy(&str_sink);
    }
    catch (...) {
        xdr_destroy(&str_sink);
        throw;
    }
}

void XDRStreamMarshaller::put_url(const string &val)
{
    put_str(val);
}

void XDRStreamMarshaller::put_opaque(char *val, unsigned int len)
{
    if (len > XDR_DAP_BUFF_SIZE)
        throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");

    if (!xdr_setpos(&d_sink, 0))
        throw Error(
            "Network I/O Error. Could not send opaque data - unable to set stream position.");

    if (!xdr_opaque(&d_sink, val, len))
        throw Error(
            "Network I/O Error. Could not send opaque data.");

    unsigned int bytes_written = xdr_getpos(&d_sink);
    if (!bytes_written)
        throw Error(
            "Network I/O Error. Could not send opaque data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
    Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

    d_out.write(d_buf, bytes_written);
}

void XDRStreamMarshaller::put_int(int val)
{
    if (!xdr_setpos(&d_sink, 0))
        throw Error(
            "Network I/O Error. Could not send int data - unable to set stream position.");

    if (!xdr_int(&d_sink, &val))
        throw Error(
            "Network I/O Error(1). Could not send int data.");

    unsigned int bytes_written = xdr_getpos(&d_sink);
    if (!bytes_written)
        throw Error(
            "Network I/O Error. Could not send int data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
    Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

    d_out.write(d_buf, bytes_written);
}

void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec)
{
    put_vector(val, num, width, vec.var()->type());
}


/**
 * Prepare to send a single array/vector using a series of 'put' calls.
 *
 * @param num The number of elements in the Array/Vector
 * @see put_vector_part()
 * @see put_vector_end()
 */
void XDRStreamMarshaller::put_vector_start(int num)
{
    put_int(num);
    put_int(num);

    d_partial_put_byte_count = 0;
}

/**
 * Close a vector when its values are written using put_vector_part().
 *
 * @see put_vector_start()
 * @see put_vector_part()
 */
void XDRStreamMarshaller::put_vector_end()
{
#ifdef USE_POSIX_THREADS
    Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
#endif

    // Compute the trailing (padding) bytes

    // Note that the XDR standard pads values to 4 byte boundaries.
    //unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4);
    unsigned int mod_4 = d_partial_put_byte_count & 0x03;
    unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4;

    if (pad) {
        vector<char> padding(4, 0); // 4 zeros

        d_out.write(&padding[0], pad);
        if (d_out.fail()) throw Error("Network I/O Error. Could not send vector data padding");
    }
}

// Start of parallel I/O support. jhrg 8/19/15
void XDRStreamMarshaller::put_vector(char *val, int num, Vector &)
{
    if (!val) throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set.");

    // write the number of members of the array being written and then set the position to 0
    put_int(num);

    // this is the word boundary for writing xdr bytes in a vector.
    const unsigned int add_to = 8;
    // switch to memory on the heap since the thread will need to access it
    // after this code returns.
    char *byte_buf = new char[num + add_to];
    XDR byte_sink;
    try {
        xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
        if (!xdr_setpos(&byte_sink, 0))
            throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");

        if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, num + add_to))
            throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");

        unsigned int bytes_written = xdr_getpos(&byte_sink);
        if (!bytes_written)
            throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
        Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
        tm->increment_child_thread_count();
        tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written);
        xdr_destroy(&byte_sink);
#else
        d_out.write(byte_buf, bytes_written);
        xdr_destroy(&byte_sink);
        delete [] byte_buf;
#endif

    }
    catch (...) {
        DBG(cerr << "Caught an exception in put_vector_thread" << endl);
        xdr_destroy(&byte_sink);
        delete [] byte_buf;
        throw;
    }
}

// private
/**
 * Write elements of a Vector (i.e. an Array) to the stream using XDR encoding.
 * Encoding is performed on 'num' values that use 'width' bytes. The parameter
 * 'type' is used to choose the XDR encoding function.
 *
 * @param val Pointer to the values to write
 * @param num The number of elements in the memory referenced by 'val'
 * @param width The number of bytes in each element
 * @param type The DAP type of the elements
 */
void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type)
{
    assert(val || num == 0);

    // write the number of array members being written, then set the position back to 0
    put_int(num);

    if (num == 0)
        return;

    int use_width = width;
    if (use_width < 4) use_width = 4;

    // the size is the number of elements num times the width of each
    // element, then add 4 bytes for the number of elements
    int size = (num * use_width) + 4;

    // allocate enough memory for the elements
    //vector<char> vec_buf(size);
    char *vec_buf = new char[size];
    XDR vec_sink;
    try {
        xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);

        // set the position of the sink to 0, we're starting at the beginning
        if (!xdr_setpos(&vec_sink, 0))
            throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");

        // write the array to the buffer
        if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
            throw Error("Network I/O Error(2). Could not send vector data - unable to encode.");

        // how much was written to the buffer
        unsigned int bytes_written = xdr_getpos(&vec_sink);
        if (!bytes_written)
            throw Error("Network I/O Error. Could not send vector data - unable to get stream position.");

#ifdef USE_POSIX_THREADS
        Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
        tm->increment_child_thread_count();
        tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written);
        xdr_destroy(&vec_sink);
#else
        d_out.write(vec_buf, bytes_written);
        xdr_destroy(&vec_sink);
        delete [] vec_buf;
#endif
    }
    catch (...) {
        xdr_destroy(&vec_sink);
        delete [] vec_buf;
        throw;
    }
}

/**
 * Write num values for an Array/Vector.
 *
 * @param val The values to write
 * @param num the number of values to write
 * @param width The width of the values
 * @param type The DAP2 type of the values.
 *
 * @see put_vector_start()
 * @see put_vector_end()
 */
void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type)
{
    if (width == 1) {
        // Add space for the 4 bytes of length info and 4 bytes for padding, even though
        // we will not send either of those.
        const unsigned int add_to = 8;
        unsigned int bufsiz = num + add_to;
        //vector<char> byte_buf(bufsiz);
        char *byte_buf = new char[bufsiz];
        XDR byte_sink;
        try {
            xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
            if (!xdr_setpos(&byte_sink, 0))
                throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");

            if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, bufsiz))
                throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");

#ifdef USE_POSIX_THREADS
            Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
            tm->increment_child_thread_count();

            // Increment the element count so we can figure out about the padding in put_vector_last()
            d_partial_put_byte_count += num;

            tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num);
            xdr_destroy(&byte_sink);
#else
            // Only send the num bytes that follow the 4 bytes of length info - we skip the
            // length info because it's already been sent and we don't send any trailing padding
            // bytes in this method (see put_vector_last() for that).
            d_out.write(byte_buf + 4, num);

            if (d_out.fail())
            throw Error ("Network I/O Error. Could not send initial part of byte vector data");

            // Now increment the element count so we can figure out about the padding in put_vector_last()
            d_partial_put_byte_count += num;

            xdr_destroy(&byte_sink);
            delete [] byte_buf;
#endif
        }
        catch (...) {
            xdr_destroy(&byte_sink);
            delete [] byte_buf;
            throw;
        }
    }
    else {
        int use_width = (width < 4) ? 4 : width;

        // the size is the number of elements num times the width of each
        // element, then add 4 bytes for the (int) number of elements
        int size = (num * use_width) + 4;

        // allocate enough memory for the elements
        //vector<char> vec_buf(size);
        char *vec_buf = new char[size];
        XDR vec_sink;
        try {
            xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);

            // set the position of the sink to 0, we're starting at the beginning
            if (!xdr_setpos(&vec_sink, 0))
                throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");

            // write the array to the buffer
            if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
                throw Error("Network I/O Error(2). Could not send vector data -unable to encode data.");

#ifdef USE_POSIX_THREADS
            Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
            tm->increment_child_thread_count();

            // Increment the element count so we can figure out about the padding in put_vector_last()
            d_partial_put_byte_count += (size - 4);
            tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4);
            xdr_destroy(&vec_sink);
#else
            // write that much out to the output stream, skipping the length data that
            // XDR writes since we have already written the length info using put_vector_start()
            d_out.write(vec_buf + 4, size - 4);

            if (d_out.fail())
                throw Error ("Network I/O Error. Could not send part of vector data");

            // Now increment the element count so we can figure out about the padding in put_vector_last()
            d_partial_put_byte_count += (size - 4);

            xdr_destroy(&vec_sink);
            delete [] vec_buf;
#endif
        }
        catch (...) {
            xdr_destroy(&vec_sink);
            delete [] vec_buf;
            throw;
        }
    }
}

void XDRStreamMarshaller::dump(ostream &strm) const
{
    strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *) this << ")" << endl;
}

} // namespace libdap