// XDRStreamMarshaller.cc // -*- mode: c++; c-basic-offset:4 -*- // This file is part of libdap, A C++ implementation of the OPeNDAP Data // Access Protocol. // Copyright (c) 2002,2003,2016 OPeNDAP, Inc. // Author: Patrick West // James Gallagher // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public // License as published by the Free Software Foundation; either // version 2.1 of the License, or (at your option) any later version. // // This library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public // License along with this library; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA // // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112. // (c) COPYRIGHT URI/MIT 1994-1999 // Please read the full copyright statement in the file COPYRIGHT_URI. // // Authors: // pwest Patrick West #include "config.h" #ifdef HAVE_PTHREAD_H #include #endif #include #include #include #include // #define DODS_DEBUG #include "XDRStreamMarshaller.h" #ifdef USE_POSIX_THREADS #include "MarshallerThread.h" #endif #include "Vector.h" #include "XDRUtils.h" #include "util.h" #include "debug.h" using namespace std; // Build this code so it does not use pthreads to write some kinds of // data (see the put_vector() and put_vector_part() methods) in a child thread. // #undef USE_POSIX_THREADS namespace libdap { char *XDRStreamMarshaller::d_buf = 0; static const int XDR_DAP_BUFF_SIZE=256; /** Build an instance of XDRStreamMarshaller. Bind the C++ stream out to this * instance. If the checksum parameter is true, initialize a checksum buffer * and enable the use of the reset_checksum() and get_checksum() methods. * * @param out Write to this stream object. * @param checksum If true, compute checksums. False by default * @param write_data If true, write data values. True by default */ XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) : d_out(out), d_partial_put_byte_count(0), tm(0) { if (!d_buf) d_buf = (char *) malloc(XDR_DAP_BUFF_SIZE); if (!d_buf) throw Error(internal_error, "Failed to allocate memory for data serialization."); xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE); #ifdef USE_POSIX_THREADS tm = new MarshallerThread; #endif } XDRStreamMarshaller::~XDRStreamMarshaller() { // Added this because when USE_POS... is not defined, 'tm' has no // type, which the compiler complains about. #ifdef USE_POSIX_THREADS delete tm; #endif xdr_destroy(&d_sink); } void XDRStreamMarshaller::put_byte(dods_byte val) { if (!xdr_setpos(&d_sink, 0)) throw Error("Network I/O Error. Could not send byte data - unable to set stream position."); if (!xdr_char(&d_sink, (char *) &val)) throw Error( "Network I/O Error. Could not send byte data."); unsigned int bytes_written = xdr_getpos(&d_sink); if (!bytes_written) throw Error( "Network I/O Error. Could not send byte data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif d_out.write(d_buf, bytes_written); } void XDRStreamMarshaller::put_int16(dods_int16 val) { if (!xdr_setpos(&d_sink, 0)) throw Error( "Network I/O Error. Could not send int 16 data - unable to set stream position."); if (!XDR_INT16(&d_sink, &val)) throw Error( "Network I/O Error. Could not send int 16 data."); unsigned int bytes_written = xdr_getpos(&d_sink); if (!bytes_written) throw Error( "Network I/O Error. Could not send int 16 data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif d_out.write(d_buf, bytes_written); } void XDRStreamMarshaller::put_int32(dods_int32 val) { if (!xdr_setpos(&d_sink, 0)) throw Error( "Network I/O Error. Could not send int 32 data - unable to set stream position."); if (!XDR_INT32(&d_sink, &val)) throw Error( "Network I/O Error. Culd not read int 32 data."); unsigned int bytes_written = xdr_getpos(&d_sink); if (!bytes_written) throw Error( "Network I/O Error. Could not send int 32 data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif d_out.write(d_buf, bytes_written); } void XDRStreamMarshaller::put_float32(dods_float32 val) { if (!xdr_setpos(&d_sink, 0)) throw Error( "Network I/O Error. Could not send float 32 data - unable to set stream position."); if (!xdr_float(&d_sink, &val)) throw Error( "Network I/O Error. Could not send float 32 data."); unsigned int bytes_written = xdr_getpos(&d_sink); if (!bytes_written) throw Error( "Network I/O Error. Could not send float 32 data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif d_out.write(d_buf, bytes_written); } void XDRStreamMarshaller::put_float64(dods_float64 val) { if (!xdr_setpos(&d_sink, 0)) throw Error( "Network I/O Error. Could not send float 64 data - unable to set stream position."); if (!xdr_double(&d_sink, &val)) throw Error( "Network I/O Error. Could not send float 64 data."); unsigned int bytes_written = xdr_getpos(&d_sink); if (!bytes_written) throw Error( "Network I/O Error. Could not send float 64 data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif d_out.write(d_buf, bytes_written); } void XDRStreamMarshaller::put_uint16(dods_uint16 val) { if (!xdr_setpos(&d_sink, 0)) throw Error( "Network I/O Error. Could not send uint 16 data - unable to set stream position."); if (!XDR_UINT16(&d_sink, &val)) throw Error( "Network I/O Error. Could not send uint 16 data."); unsigned int bytes_written = xdr_getpos(&d_sink); if (!bytes_written) throw Error( "Network I/O Error. Could not send uint 16 data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif d_out.write(d_buf, bytes_written); } void XDRStreamMarshaller::put_uint32(dods_uint32 val) { if (!xdr_setpos(&d_sink, 0)) throw Error( "Network I/O Error. Could not send uint 32 data - unable to set stream position."); if (!XDR_UINT32(&d_sink, &val)) throw Error( "Network I/O Error. Could not send uint 32 data."); unsigned int bytes_written = xdr_getpos(&d_sink); if (!bytes_written) throw Error( "Network I/O Error. Could not send uint 32 data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif d_out.write(d_buf, bytes_written); } void XDRStreamMarshaller::put_str(const string &val) { int size = val.length() + 8; XDR str_sink; vector str_buf(size); try { xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE); if (!xdr_setpos(&str_sink, 0)) throw Error( "Network I/O Error. Could not send string data - unable to set stream position."); const char *out_tmp = val.c_str(); if (!xdr_string(&str_sink, (char **) &out_tmp, size)) throw Error( "Network I/O Error. Could not send string data."); unsigned int bytes_written = xdr_getpos(&str_sink); if (!bytes_written) throw Error( "Network I/O Error. Could not send string data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif d_out.write(&str_buf[0], bytes_written); xdr_destroy(&str_sink); } catch (...) { xdr_destroy(&str_sink); throw; } } void XDRStreamMarshaller::put_url(const string &val) { put_str(val); } void XDRStreamMarshaller::put_opaque(char *val, unsigned int len) { if (len > XDR_DAP_BUFF_SIZE) throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed"); if (!xdr_setpos(&d_sink, 0)) throw Error( "Network I/O Error. Could not send opaque data - unable to set stream position."); if (!xdr_opaque(&d_sink, val, len)) throw Error( "Network I/O Error. Could not send opaque data."); unsigned int bytes_written = xdr_getpos(&d_sink); if (!bytes_written) throw Error( "Network I/O Error. Could not send opaque data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif d_out.write(d_buf, bytes_written); } void XDRStreamMarshaller::put_int(int val) { if (!xdr_setpos(&d_sink, 0)) throw Error( "Network I/O Error. Could not send int data - unable to set stream position."); if (!xdr_int(&d_sink, &val)) throw Error( "Network I/O Error(1). Could not send int data."); unsigned int bytes_written = xdr_getpos(&d_sink); if (!bytes_written) throw Error( "Network I/O Error. Could not send int data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif d_out.write(d_buf, bytes_written); } void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec) { put_vector(val, num, width, vec.var()->type()); } /** * Prepare to send a single array/vector using a series of 'put' calls. * * @param num The number of elements in the Array/Vector * @see put_vector_part() * @see put_vector_end() */ void XDRStreamMarshaller::put_vector_start(int num) { put_int(num); put_int(num); d_partial_put_byte_count = 0; } /** * Close a vector when its values are written using put_vector_part(). * * @see put_vector_start() * @see put_vector_part() */ void XDRStreamMarshaller::put_vector_end() { #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); #endif // Compute the trailing (padding) bytes // Note that the XDR standard pads values to 4 byte boundaries. //unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4); unsigned int mod_4 = d_partial_put_byte_count & 0x03; unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4; if (pad) { vector padding(4, 0); // 4 zeros d_out.write(&padding[0], pad); if (d_out.fail()) throw Error("Network I/O Error. Could not send vector data padding"); } } // Start of parallel I/O support. jhrg 8/19/15 void XDRStreamMarshaller::put_vector(char *val, int num, Vector &) { if (!val) throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set."); // write the number of members of the array being written and then set the position to 0 put_int(num); // this is the word boundary for writing xdr bytes in a vector. const unsigned int add_to = 8; // switch to memory on the heap since the thread will need to access it // after this code returns. char *byte_buf = new char[num + add_to]; XDR byte_sink; try { xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE); if (!xdr_setpos(&byte_sink, 0)) throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position."); if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, num + add_to)) throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data."); unsigned int bytes_written = xdr_getpos(&byte_sink); if (!bytes_written) throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); tm->increment_child_thread_count(); tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written); xdr_destroy(&byte_sink); #else d_out.write(byte_buf, bytes_written); xdr_destroy(&byte_sink); delete [] byte_buf; #endif } catch (...) { DBG(cerr << "Caught an exception in put_vector_thread" << endl); xdr_destroy(&byte_sink); delete [] byte_buf; throw; } } // private /** * Write elements of a Vector (i.e. an Array) to the stream using XDR encoding. * Encoding is performed on 'num' values that use 'width' bytes. The parameter * 'type' is used to choose the XDR encoding function. * * @param val Pointer to the values to write * @param num The number of elements in the memory referenced by 'val' * @param width The number of bytes in each element * @param type The DAP type of the elements */ void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type) { assert(val || num == 0); // write the number of array members being written, then set the position back to 0 put_int(num); if (num == 0) return; int use_width = width; if (use_width < 4) use_width = 4; // the size is the number of elements num times the width of each // element, then add 4 bytes for the number of elements int size = (num * use_width) + 4; // allocate enough memory for the elements //vector vec_buf(size); char *vec_buf = new char[size]; XDR vec_sink; try { xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE); // set the position of the sink to 0, we're starting at the beginning if (!xdr_setpos(&vec_sink, 0)) throw Error("Network I/O Error. Could not send vector data - unable to set stream position."); // write the array to the buffer if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type))) throw Error("Network I/O Error(2). Could not send vector data - unable to encode."); // how much was written to the buffer unsigned int bytes_written = xdr_getpos(&vec_sink); if (!bytes_written) throw Error("Network I/O Error. Could not send vector data - unable to get stream position."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); tm->increment_child_thread_count(); tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written); xdr_destroy(&vec_sink); #else d_out.write(vec_buf, bytes_written); xdr_destroy(&vec_sink); delete [] vec_buf; #endif } catch (...) { xdr_destroy(&vec_sink); delete [] vec_buf; throw; } } /** * Write num values for an Array/Vector. * * @param val The values to write * @param num the number of values to write * @param width The width of the values * @param type The DAP2 type of the values. * * @see put_vector_start() * @see put_vector_end() */ void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type) { if (width == 1) { // Add space for the 4 bytes of length info and 4 bytes for padding, even though // we will not send either of those. const unsigned int add_to = 8; unsigned int bufsiz = num + add_to; //vector byte_buf(bufsiz); char *byte_buf = new char[bufsiz]; XDR byte_sink; try { xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE); if (!xdr_setpos(&byte_sink, 0)) throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position."); if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, bufsiz)) throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); tm->increment_child_thread_count(); // Increment the element count so we can figure out about the padding in put_vector_last() d_partial_put_byte_count += num; tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num); xdr_destroy(&byte_sink); #else // Only send the num bytes that follow the 4 bytes of length info - we skip the // length info because it's already been sent and we don't send any trailing padding // bytes in this method (see put_vector_last() for that). d_out.write(byte_buf + 4, num); if (d_out.fail()) throw Error ("Network I/O Error. Could not send initial part of byte vector data"); // Now increment the element count so we can figure out about the padding in put_vector_last() d_partial_put_byte_count += num; xdr_destroy(&byte_sink); delete [] byte_buf; #endif } catch (...) { xdr_destroy(&byte_sink); delete [] byte_buf; throw; } } else { int use_width = (width < 4) ? 4 : width; // the size is the number of elements num times the width of each // element, then add 4 bytes for the (int) number of elements int size = (num * use_width) + 4; // allocate enough memory for the elements //vector vec_buf(size); char *vec_buf = new char[size]; XDR vec_sink; try { xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE); // set the position of the sink to 0, we're starting at the beginning if (!xdr_setpos(&vec_sink, 0)) throw Error("Network I/O Error. Could not send vector data - unable to set stream position."); // write the array to the buffer if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type))) throw Error("Network I/O Error(2). Could not send vector data -unable to encode data."); #ifdef USE_POSIX_THREADS Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); tm->increment_child_thread_count(); // Increment the element count so we can figure out about the padding in put_vector_last() d_partial_put_byte_count += (size - 4); tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4); xdr_destroy(&vec_sink); #else // write that much out to the output stream, skipping the length data that // XDR writes since we have already written the length info using put_vector_start() d_out.write(vec_buf + 4, size - 4); if (d_out.fail()) throw Error ("Network I/O Error. Could not send part of vector data"); // Now increment the element count so we can figure out about the padding in put_vector_last() d_partial_put_byte_count += (size - 4); xdr_destroy(&vec_sink); delete [] vec_buf; #endif } catch (...) { xdr_destroy(&vec_sink); delete [] vec_buf; throw; } } } void XDRStreamMarshaller::dump(ostream &strm) const { strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *) this << ")" << endl; } } // namespace libdap