Blob Blame History Raw

// -*- mode: c++; c-basic-offset:4 -*-

// This file is part of libdap, A C++ implementation of the OPeNDAP Data
// Access Protocol.

// Copyright (c) 2015 OPeNDAP, Inc.
// Author: James Gallagher <jgallagher@opendap.org>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
//
// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.

/*
 * MarshallerThread.h
 *
 *  Created on: Aug 27, 2015
 *      Author: jimg
 */

#ifndef MARSHALLERTHREAD_H_
#define MARSHALLERTHREAD_H_

#include <pthread.h>

#include <iostream>
#include <ostream>
#include <string>

namespace libdap {

/**
 * RAII for the MarshallerThread mutex and condition variable. Used by the
 * Main thread. The constructor locks the mutex and then, if the count of
 * child threads is not zero, blocks on the associated condition variable.
 * When signaled by the child thread using the condition variable (the child
 * thread count should then be zero), the mutex is (re)locked and the ctor
 * returns. The destructor unlocks the mutex.
 */
class Locker {
public:
    Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count);
    virtual ~Locker();

private:
    pthread_mutex_t& m_mutex;

    Locker();
    Locker(const Locker &rhs);
};

/**
 * Synchronization for the child thread in the multi-threaded version of
 * the DAP2 and DAP4 (when it gets implement) 'Marshaller' class used to
 * send data. The class declared below (MarshallerThread) manages the
 * child thread.
 *
 * The ctor of this class simply
 * locks the mutex; the dtor clears the child thread count, signals that
 * count has changed and unlocks the mutex.
 */
class ChildLocker {
public:
    ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count);
    virtual ~ChildLocker();

private:
    pthread_mutex_t& m_mutex;
    pthread_cond_t& m_cond;
    int& m_count;

    ChildLocker();
    ChildLocker(const Locker &rhs);
};

/**
 * Implement a multi-threaded data transmission sub-system for libdap.
 * This class makes it fairly painless to send data using a child thread
 * so that the main thread can be used to read the next chunk of data
 * while whatever has been read to this point is sent over the wire.
 *
 * This code is used by XDRStreamMarshaller and (soon) D4StreamMarshaller.
 */
class MarshallerThread {
private:
    pthread_t d_thread;
    pthread_attr_t d_thread_attr;

    pthread_mutex_t d_out_mutex;
    pthread_cond_t d_out_cond;

    int d_child_thread_count;   // 0 or 1
    std::string d_thread_error; // non-null indicates an error

#if 0
    static bool print_time; // false by default
#endif

    /**
     * Used to pass information into the static methods that run the
     * simple stream writer threads. This can pass both an ostream or
     * a file descriptor. If a fd is passed, the ostream reference is
     * set to stderr (i.e., std::cerr).
     */
    struct write_args {
        pthread_mutex_t &d_mutex;
        pthread_cond_t &d_cond;
        int &d_count;
        std::string &d_error;
        std::ostream &d_out;     // The output stream protected by the mutex, ...
        int d_out_file;       // file descriptor; if not -1, use this.
        char *d_buf;        // The data to write to the stream
        int d_num;          // The size of d_buf

        /**
         * Build args for an ostream. The file descriptor is set to -1
         */
        write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, std::ostream &s, char *vals, int num) :
            d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(s), d_out_file(-1), d_buf(vals), d_num(num)
        {
        }

        /**
         * Build args for a file descriptr. The ostream is set to cerr (because it is
         * a reference and has to be initialized to something).
         */
        write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, int fd, char *vals, int num) :
            d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(std::cerr), d_out_file(fd), d_buf(vals), d_num(num)
        {
        }
   };

public:
    MarshallerThread();
    virtual ~MarshallerThread();

    pthread_mutex_t &get_mutex() { return d_out_mutex; }
    pthread_cond_t &get_cond() { return d_out_cond; }

    int &get_child_thread_count() { return d_child_thread_count; }
    void increment_child_thread_count() { ++d_child_thread_count; }

    void start_thread(void* (*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written);
    void start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes_written);

    // These are static so they will have c-linkage - required because they
    // are passed to pthread_create()
    static void *write_thread(void *arg);
    static void *write_thread_part(void *arg);

#if 0
    static void set_print_time(bool state) { print_time = state; }
    static bool get_print_time() { return print_time; }
#endif

};

}

#endif /* MARSHALLERTHREAD_H_ */