// -*- mode: c++; c-basic-offset:4 -*- // This file is part of libdap, A C++ implementation of the OPeNDAP Data // Access Protocol. // Copyright (c) 2015 OPeNDAP, Inc. // Author: James Gallagher // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public // License as published by the Free Software Foundation; either // version 2.1 of the License, or (at your option) any later version. // // This library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public // License along with this library; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA // // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112. /* * MarshallerThread.h * * Created on: Aug 27, 2015 * Author: jimg */ #ifndef MARSHALLERTHREAD_H_ #define MARSHALLERTHREAD_H_ #include #include #include #include namespace libdap { /** * RAII for the MarshallerThread mutex and condition variable. Used by the * Main thread. The constructor locks the mutex and then, if the count of * child threads is not zero, blocks on the associated condition variable. * When signaled by the child thread using the condition variable (the child * thread count should then be zero), the mutex is (re)locked and the ctor * returns. The destructor unlocks the mutex. */ class Locker { public: Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count); virtual ~Locker(); private: pthread_mutex_t& m_mutex; Locker(); Locker(const Locker &rhs); }; /** * Synchronization for the child thread in the multi-threaded version of * the DAP2 and DAP4 (when it gets implement) 'Marshaller' class used to * send data. The class declared below (MarshallerThread) manages the * child thread. * * The ctor of this class simply * locks the mutex; the dtor clears the child thread count, signals that * count has changed and unlocks the mutex. */ class ChildLocker { public: ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count); virtual ~ChildLocker(); private: pthread_mutex_t& m_mutex; pthread_cond_t& m_cond; int& m_count; ChildLocker(); ChildLocker(const Locker &rhs); }; /** * Implement a multi-threaded data transmission sub-system for libdap. * This class makes it fairly painless to send data using a child thread * so that the main thread can be used to read the next chunk of data * while whatever has been read to this point is sent over the wire. * * This code is used by XDRStreamMarshaller and (soon) D4StreamMarshaller. */ class MarshallerThread { private: pthread_t d_thread; pthread_attr_t d_thread_attr; pthread_mutex_t d_out_mutex; pthread_cond_t d_out_cond; int d_child_thread_count; // 0 or 1 std::string d_thread_error; // non-null indicates an error #if 0 static bool print_time; // false by default #endif /** * Used to pass information into the static methods that run the * simple stream writer threads. This can pass both an ostream or * a file descriptor. If a fd is passed, the ostream reference is * set to stderr (i.e., std::cerr). */ struct write_args { pthread_mutex_t &d_mutex; pthread_cond_t &d_cond; int &d_count; std::string &d_error; std::ostream &d_out; // The output stream protected by the mutex, ... int d_out_file; // file descriptor; if not -1, use this. char *d_buf; // The data to write to the stream int d_num; // The size of d_buf /** * Build args for an ostream. The file descriptor is set to -1 */ write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, std::ostream &s, char *vals, int num) : d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(s), d_out_file(-1), d_buf(vals), d_num(num) { } /** * Build args for a file descriptr. The ostream is set to cerr (because it is * a reference and has to be initialized to something). */ write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, int fd, char *vals, int num) : d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(std::cerr), d_out_file(fd), d_buf(vals), d_num(num) { } }; public: MarshallerThread(); virtual ~MarshallerThread(); pthread_mutex_t &get_mutex() { return d_out_mutex; } pthread_cond_t &get_cond() { return d_out_cond; } int &get_child_thread_count() { return d_child_thread_count; } void increment_child_thread_count() { ++d_child_thread_count; } void start_thread(void* (*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written); void start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes_written); // These are static so they will have c-linkage - required because they // are passed to pthread_create() static void *write_thread(void *arg); static void *write_thread_part(void *arg); #if 0 static void set_print_time(bool state) { print_time = state; } static bool get_print_time() { return print_time; } #endif }; } #endif /* MARSHALLERTHREAD_H_ */