Blame MarshallerThread.cc

Packit a4aae4
// -*- mode: c++; c-basic-offset:4 -*-
Packit a4aae4
Packit a4aae4
// This file is part of libdap, A C++ implementation of the OPeNDAP Data
Packit a4aae4
// Access Protocol.
Packit a4aae4
Packit a4aae4
// Copyright (c) 2015 OPeNDAP, Inc.
Packit a4aae4
// Author: James Gallagher <jgallagher@opendap.org>
Packit a4aae4
//
Packit a4aae4
// This library is free software; you can redistribute it and/or
Packit a4aae4
// modify it under the terms of the GNU Lesser General Public
Packit a4aae4
// License as published by the Free Software Foundation; either
Packit a4aae4
// version 2.1 of the License, or (at your option) any later version.
Packit a4aae4
//
Packit a4aae4
// This library is distributed in the hope that it will be useful,
Packit a4aae4
// but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit a4aae4
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit a4aae4
// Lesser General Public License for more details.
Packit a4aae4
//
Packit a4aae4
// You should have received a copy of the GNU Lesser General Public
Packit a4aae4
// License along with this library; if not, write to the Free Software
Packit a4aae4
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
Packit a4aae4
//
Packit a4aae4
// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
Packit a4aae4
Packit a4aae4
/*
Packit a4aae4
 * MarshallerThread.cc
Packit a4aae4
 *
Packit a4aae4
 *  Created on: Aug 27, 2015
Packit a4aae4
 *      Author: jimg
Packit a4aae4
 */
Packit a4aae4
Packit a4aae4
#include "config.h"
Packit a4aae4
Packit a4aae4
#include <pthread.h>
Packit a4aae4
#include <sys/time.h>
Packit a4aae4
#include <fcntl.h>
Packit a4aae4
#include <unistd.h>
Packit a4aae4
Packit a4aae4
#include <ostream>
Packit a4aae4
#include <sstream>
Packit a4aae4
Packit a4aae4
#include "MarshallerThread.h"
Packit a4aae4
#include "Error.h"
Packit a4aae4
#include "InternalErr.h"
Packit a4aae4
#include "debug.h"
Packit a4aae4
Packit a4aae4
using namespace libdap;
Packit a4aae4
using namespace std;
Packit a4aae4
Packit a4aae4
#if 0
Packit a4aae4
bool MarshallerThread::print_time = false;
Packit a4aae4
Packit a4aae4
/**
Packit a4aae4
 * Use this with timeval structures returned by gettimeofday() to compute
Packit a4aae4
 * real time (instead of user time that is returned by std::clock() or
Packit a4aae4
 * get_rusage()).
Packit a4aae4
 */
Packit a4aae4
static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start)
Packit a4aae4
{
Packit a4aae4
    /* Perform the carry for the later subtraction by updating y. */
Packit a4aae4
    if (stop->tv_usec < start->tv_usec) {
Packit a4aae4
        int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
Packit a4aae4
        start->tv_usec -= 1000000 * nsec;
Packit a4aae4
        start->tv_sec += nsec;
Packit a4aae4
    }
Packit a4aae4
    if (stop->tv_usec - start->tv_usec > 1000000) {
Packit a4aae4
        int nsec = (start->tv_usec - stop->tv_usec) / 1000000;
Packit a4aae4
        start->tv_usec += 1000000 * nsec;
Packit a4aae4
        start->tv_sec -= nsec;
Packit a4aae4
    }
Packit a4aae4
Packit a4aae4
    double result = stop->tv_sec - start->tv_sec;
Packit a4aae4
    result += double(stop->tv_usec - start->tv_usec) / 1000000;
Packit a4aae4
    return result;
Packit a4aae4
}
Packit a4aae4
#endif
Packit a4aae4
Packit a4aae4
Packit a4aae4
/**
Packit a4aae4
 * Lock the mutex then wait for the child thread to signal using the
Packit a4aae4
 * condition variable 'cond'. Once the signal is received, re-test count
Packit a4aae4
 * to make sure it's zero (there are no child threads).
Packit a4aae4
 *
Packit a4aae4
 * This is used to lock the main thread and ensure that a second child
Packit a4aae4
 * (writer) thread is not started until any current child thread completes,
Packit a4aae4
 * which keeps the write operations in the correct order.
Packit a4aae4
 */
Packit a4aae4
Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
Packit a4aae4
    m_mutex(lock)
Packit a4aae4
{
Packit a4aae4
    int status = pthread_mutex_lock(&m_mutex);
Packit a4aae4
Packit a4aae4
    DBG(cerr << "Locking the mutex! (waiting; " << pthread_self() << ")" << endl);
Packit a4aae4
Packit a4aae4
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
Packit a4aae4
    while (count != 0) {
Packit a4aae4
        status = pthread_cond_wait(&cond, &m_mutex);
Packit a4aae4
        if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
Packit a4aae4
    }
Packit a4aae4
    if (count != 0) throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
Packit a4aae4
Packit a4aae4
    DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
Packit a4aae4
}
Packit a4aae4
Packit a4aae4
/**
Packit a4aae4
 * Unlock the mutex
Packit a4aae4
 */
Packit a4aae4
Locker::~Locker()
Packit a4aae4
{
Packit a4aae4
    DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
Packit a4aae4
Packit a4aae4
    int status = pthread_mutex_unlock(&m_mutex);
Packit a4aae4
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
Packit a4aae4
}
Packit a4aae4
Packit a4aae4
Packit a4aae4
/**
Packit a4aae4
 * Lock the mutex, but do not wait on the condition variable.
Packit a4aae4
 * This is used by the child thread; it helps ensure that the
Packit a4aae4
 * mutex is unlocked and the predicate is reset no matter how the
Packit a4aae4
 * child thread is exited.
Packit a4aae4
 *
Packit a4aae4
 * Note we how a reference to the shared 'count' predicate that
Packit a4aae4
 * tells how many (0 or 1) child threads exist so that when this
Packit a4aae4
 * version of the Locker object is destroyed, we can zero that.
Packit a4aae4
 * This enables us to use RAII in the child thread and ensure
Packit a4aae4
 * the invariant if there is an error and the code exits with a
Packit a4aae4
 * summary return.
Packit a4aae4
 */
Packit a4aae4
ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
Packit a4aae4
    m_mutex(lock), m_cond(cond), m_count(count)
Packit a4aae4
{
Packit a4aae4
    int status = pthread_mutex_lock(&m_mutex);
Packit a4aae4
Packit a4aae4
    DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);
Packit a4aae4
Packit a4aae4
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
Packit a4aae4
Packit a4aae4
    DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
Packit a4aae4
}
Packit a4aae4
Packit a4aae4
ChildLocker::~ChildLocker()
Packit a4aae4
{
Packit a4aae4
    DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
Packit a4aae4
Packit a4aae4
    m_count = 0;
Packit a4aae4
    int status = pthread_cond_signal(&m_cond);
Packit a4aae4
    if (status != 0)
Packit a4aae4
        throw InternalErr(__FILE__, __LINE__, "Could not signal main thread from ChildLocker!");
Packit a4aae4
Packit a4aae4
    status = pthread_mutex_unlock(&m_mutex);
Packit a4aae4
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
Packit a4aae4
}
Packit a4aae4
Packit a4aae4
MarshallerThread::MarshallerThread() :
Packit a4aae4
    d_thread(0), d_child_thread_count(0)
Packit a4aae4
{
Packit a4aae4
    if (pthread_attr_init(&d_thread_attr) != 0) throw Error(internal_error, "Failed to initialize pthread attributes.");
Packit a4aae4
    if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0)
Packit a4aae4
        throw Error(internal_error, "Failed to complete pthread attribute initialization.");
Packit a4aae4
Packit a4aae4
    if (pthread_mutex_init(&d_out_mutex, 0) != 0) throw Error(internal_error, "Failed to initialize mutex.");
Packit a4aae4
    if (pthread_cond_init(&d_out_cond, 0) != 0) throw Error(internal_error, "Failed to initialize cond.");
Packit a4aae4
}
Packit a4aae4
Packit a4aae4
MarshallerThread::~MarshallerThread()
Packit a4aae4
{
Packit a4aae4
    int status = pthread_mutex_lock(&d_out_mutex);
Packit a4aae4
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
Packit a4aae4
    while (d_child_thread_count != 0) {
Packit a4aae4
        status = pthread_cond_wait(&d_out_cond, &d_out_mutex);
Packit a4aae4
        if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
Packit a4aae4
    }
Packit a4aae4
    if (d_child_thread_count != 0)
Packit a4aae4
        throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
Packit a4aae4
Packit a4aae4
    status = pthread_mutex_unlock(&d_out_mutex);
Packit a4aae4
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
Packit a4aae4
Packit a4aae4
    pthread_mutex_destroy(&d_out_mutex);
Packit a4aae4
    pthread_cond_destroy(&d_out_cond);
Packit a4aae4
Packit a4aae4
    pthread_attr_destroy(&d_thread_attr);
Packit a4aae4
}
Packit a4aae4
Packit a4aae4
// not a static method
Packit a4aae4
/**
Packit a4aae4
 * Start the child thread, using the arguments given. This will write 'bytes'
Packit a4aae4
 * bytes from 'byte_buf' to the output stream 'out'
Packit a4aae4
 *
Packit a4aae4
 */
Packit a4aae4
void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf,
Packit a4aae4
    unsigned int bytes)
Packit a4aae4
{
Packit a4aae4
    write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf,
Packit a4aae4
        bytes);
Packit a4aae4
    int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
Packit a4aae4
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
Packit a4aae4
}
Packit a4aae4
Packit a4aae4
/**
Packit a4aae4
 * Write 'bytes' bytes from 'byte_buf' to the file descriptor 'fd'.
Packit a4aae4
 */
Packit a4aae4
void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes)
Packit a4aae4
{
Packit a4aae4
    write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf,
Packit a4aae4
        bytes);
Packit a4aae4
    int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
Packit a4aae4
    if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
Packit a4aae4
}
Packit a4aae4
Packit a4aae4
/**
Packit a4aae4
 * This static method is used to write data to the ostream referenced
Packit a4aae4
 * by the ostream element of write_args. This is used by start_thread()
Packit a4aae4
 * and passed to pthread_create()
Packit a4aae4
 *
Packit a4aae4
 * @note The write_args argument may contain either a file descriptor
Packit a4aae4
 * (d_out_file) or an ostream& (d_out). If the file descriptor is not
Packit a4aae4
 * -1, then use that, else use the ostream reference.
Packit a4aae4
 */
Packit a4aae4
void *
Packit a4aae4
MarshallerThread::write_thread(void *arg)
Packit a4aae4
{
Packit a4aae4
    write_args *args = reinterpret_cast<write_args *>(arg);
Packit a4aae4
Packit a4aae4
    ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
Packit a4aae4
Packit a4aae4
#if 0
Packit a4aae4
    struct timeval tp_s;
Packit a4aae4
    if (print_time && gettimeofday(&tp_s, 0) != 0) cerr << "could not read time" << endl;
Packit a4aae4
#endif
Packit a4aae4
Packit a4aae4
    // force an error
Packit a4aae4
    // return (void*)-1;
Packit a4aae4
Packit a4aae4
    if (args->d_out_file != -1) {
Packit a4aae4
        int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
Packit a4aae4
        if (bytes_written != args->d_num)
Packit a4aae4
            return (void*) -1;
Packit a4aae4
    }
Packit a4aae4
    else {
Packit a4aae4
        args->d_out.write(args->d_buf, args->d_num);
Packit a4aae4
        if (args->d_out.fail()) {
Packit a4aae4
            ostringstream oss;
Packit a4aae4
            oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
Packit a4aae4
            args->d_error = oss.str();
Packit a4aae4
            return (void*) -1;
Packit a4aae4
        }
Packit a4aae4
    }
Packit a4aae4
Packit a4aae4
    delete [] args->d_buf;
Packit a4aae4
    delete args;
Packit a4aae4
Packit a4aae4
#if 0
Packit a4aae4
    struct timeval tp_e;
Packit a4aae4
    if (print_time) {
Packit a4aae4
        if (gettimeofday(&tp_e, 0) != 0) cerr << "could not read time" << endl;
Packit a4aae4
Packit a4aae4
        cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
Packit a4aae4
    }
Packit a4aae4
#endif
Packit a4aae4
Packit a4aae4
    return 0;
Packit a4aae4
}
Packit a4aae4
Packit a4aae4
/**
Packit a4aae4
 * This static method is used to write data to the ostream referenced
Packit a4aae4
 * by the ostream element of write_args. This is used by start_thread()
Packit a4aae4
 * and passed to pthread_create()
Packit a4aae4
 *
Packit a4aae4
 * @note This differers from MarshallerThread::write_thread() in that it
Packit a4aae4
 * writes data starting _after_ the four-byte length prefix that XDR
Packit a4aae4
 * adds to the data. It is used for the put_vector_part() calls in
Packit a4aae4
 * XDRStreamMarshaller.
Packit a4aae4
 *
Packit a4aae4
 * @return 0 if successful, -1 otherwise.
Packit a4aae4
 */
Packit a4aae4
void *
Packit a4aae4
MarshallerThread::write_thread_part(void *arg)
Packit a4aae4
{
Packit a4aae4
    write_args *args = reinterpret_cast<write_args *>(arg);
Packit a4aae4
Packit a4aae4
    ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
Packit a4aae4
Packit a4aae4
    if (args->d_out_file != -1) {
Packit a4aae4
        int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
Packit a4aae4
        if (bytes_written != args->d_num) return (void*) -1;
Packit a4aae4
    }
Packit a4aae4
    else {
Packit a4aae4
        args->d_out.write(args->d_buf + 4, args->d_num);
Packit a4aae4
        if (args->d_out.fail()) {
Packit a4aae4
            ostringstream oss;
Packit a4aae4
            oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
Packit a4aae4
            args->d_error = oss.str();
Packit a4aae4
            return (void*) -1;
Packit a4aae4
        }
Packit a4aae4
    }
Packit a4aae4
Packit a4aae4
    delete [] args->d_buf;
Packit a4aae4
    delete args;
Packit a4aae4
Packit a4aae4
    return 0;
Packit a4aae4
}
Packit a4aae4