Blob Blame History Raw
// -*- mode: c++; c-basic-offset:4 -*-

// This file is part of libdap, A C++ implementation of the OPeNDAP Data
// Access Protocol.

// Copyright (c) 2015 OPeNDAP, Inc.
// Author: James Gallagher <jgallagher@opendap.org>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
//
// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.

/*
 * MarshallerThread.cc
 *
 *  Created on: Aug 27, 2015
 *      Author: jimg
 */

#include "config.h"

#include <pthread.h>
#include <sys/time.h>
#include <fcntl.h>
#include <unistd.h>

#include <ostream>
#include <sstream>

#include "MarshallerThread.h"
#include "Error.h"
#include "InternalErr.h"
#include "debug.h"

using namespace libdap;
using namespace std;

#if 0
bool MarshallerThread::print_time = false;

/**
 * Use this with timeval structures returned by gettimeofday() to compute
 * real time (instead of user time that is returned by std::clock() or
 * get_rusage()).
 */
static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start)
{
    /* Perform the carry for the later subtraction by updating y. */
    if (stop->tv_usec < start->tv_usec) {
        int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
        start->tv_usec -= 1000000 * nsec;
        start->tv_sec += nsec;
    }
    if (stop->tv_usec - start->tv_usec > 1000000) {
        int nsec = (start->tv_usec - stop->tv_usec) / 1000000;
        start->tv_usec += 1000000 * nsec;
        start->tv_sec -= nsec;
    }

    double result = stop->tv_sec - start->tv_sec;
    result += double(stop->tv_usec - start->tv_usec) / 1000000;
    return result;
}
#endif


/**
 * Lock the mutex then wait for the child thread to signal using the
 * condition variable 'cond'. Once the signal is received, re-test count
 * to make sure it's zero (there are no child threads).
 *
 * This is used to lock the main thread and ensure that a second child
 * (writer) thread is not started until any current child thread completes,
 * which keeps the write operations in the correct order.
 */
Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
    m_mutex(lock)
{
    int status = pthread_mutex_lock(&m_mutex);

    DBG(cerr << "Locking the mutex! (waiting; " << pthread_self() << ")" << endl);

    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
    while (count != 0) {
        status = pthread_cond_wait(&cond, &m_mutex);
        if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
    }
    if (count != 0) throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");

    DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
}

/**
 * Unlock the mutex
 */
Locker::~Locker()
{
    DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);

    int status = pthread_mutex_unlock(&m_mutex);
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
}


/**
 * Lock the mutex, but do not wait on the condition variable.
 * This is used by the child thread; it helps ensure that the
 * mutex is unlocked and the predicate is reset no matter how the
 * child thread is exited.
 *
 * Note we how a reference to the shared 'count' predicate that
 * tells how many (0 or 1) child threads exist so that when this
 * version of the Locker object is destroyed, we can zero that.
 * This enables us to use RAII in the child thread and ensure
 * the invariant if there is an error and the code exits with a
 * summary return.
 */
ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
    m_mutex(lock), m_cond(cond), m_count(count)
{
    int status = pthread_mutex_lock(&m_mutex);

    DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);

    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");

    DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
}

ChildLocker::~ChildLocker()
{
    DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);

    m_count = 0;
    int status = pthread_cond_signal(&m_cond);
    if (status != 0)
        throw InternalErr(__FILE__, __LINE__, "Could not signal main thread from ChildLocker!");

    status = pthread_mutex_unlock(&m_mutex);
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
}

MarshallerThread::MarshallerThread() :
    d_thread(0), d_child_thread_count(0)
{
    if (pthread_attr_init(&d_thread_attr) != 0) throw Error(internal_error, "Failed to initialize pthread attributes.");
    if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0)
        throw Error(internal_error, "Failed to complete pthread attribute initialization.");

    if (pthread_mutex_init(&d_out_mutex, 0) != 0) throw Error(internal_error, "Failed to initialize mutex.");
    if (pthread_cond_init(&d_out_cond, 0) != 0) throw Error(internal_error, "Failed to initialize cond.");
}

MarshallerThread::~MarshallerThread()
{
    int status = pthread_mutex_lock(&d_out_mutex);
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
    while (d_child_thread_count != 0) {
        status = pthread_cond_wait(&d_out_cond, &d_out_mutex);
        if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
    }
    if (d_child_thread_count != 0)
        throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");

    status = pthread_mutex_unlock(&d_out_mutex);
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");

    pthread_mutex_destroy(&d_out_mutex);
    pthread_cond_destroy(&d_out_cond);

    pthread_attr_destroy(&d_thread_attr);
}

// not a static method
/**
 * Start the child thread, using the arguments given. This will write 'bytes'
 * bytes from 'byte_buf' to the output stream 'out'
 *
 */
void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf,
    unsigned int bytes)
{
    write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf,
        bytes);
    int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
}

/**
 * Write 'bytes' bytes from 'byte_buf' to the file descriptor 'fd'.
 */
void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes)
{
    write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf,
        bytes);
    int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
}

/**
 * This static method is used to write data to the ostream referenced
 * by the ostream element of write_args. This is used by start_thread()
 * and passed to pthread_create()
 *
 * @note The write_args argument may contain either a file descriptor
 * (d_out_file) or an ostream& (d_out). If the file descriptor is not
 * -1, then use that, else use the ostream reference.
 */
void *
MarshallerThread::write_thread(void *arg)
{
    write_args *args = reinterpret_cast<write_args *>(arg);

    ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit

#if 0
    struct timeval tp_s;
    if (print_time && gettimeofday(&tp_s, 0) != 0) cerr << "could not read time" << endl;
#endif

    // force an error
    // return (void*)-1;

    if (args->d_out_file != -1) {
        int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
        if (bytes_written != args->d_num)
            return (void*) -1;
    }
    else {
        args->d_out.write(args->d_buf, args->d_num);
        if (args->d_out.fail()) {
            ostringstream oss;
            oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
            args->d_error = oss.str();
            return (void*) -1;
        }
    }

    delete [] args->d_buf;
    delete args;

#if 0
    struct timeval tp_e;
    if (print_time) {
        if (gettimeofday(&tp_e, 0) != 0) cerr << "could not read time" << endl;

        cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
    }
#endif

    return 0;
}

/**
 * This static method is used to write data to the ostream referenced
 * by the ostream element of write_args. This is used by start_thread()
 * and passed to pthread_create()
 *
 * @note This differers from MarshallerThread::write_thread() in that it
 * writes data starting _after_ the four-byte length prefix that XDR
 * adds to the data. It is used for the put_vector_part() calls in
 * XDRStreamMarshaller.
 *
 * @return 0 if successful, -1 otherwise.
 */
void *
MarshallerThread::write_thread_part(void *arg)
{
    write_args *args = reinterpret_cast<write_args *>(arg);

    ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit

    if (args->d_out_file != -1) {
        int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
        if (bytes_written != args->d_num) return (void*) -1;
    }
    else {
        args->d_out.write(args->d_buf + 4, args->d_num);
        if (args->d_out.fail()) {
            ostringstream oss;
            oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
            args->d_error = oss.str();
            return (void*) -1;
        }
    }

    delete [] args->d_buf;
    delete args;

    return 0;
}