|
Packit |
a4aae4 |
// -*- mode: c++; c-basic-offset:4 -*-
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
// This file is part of libdap, A C++ implementation of the OPeNDAP Data
|
|
Packit |
a4aae4 |
// Access Protocol.
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
// Copyright (c) 2015 OPeNDAP, Inc.
|
|
Packit |
a4aae4 |
// Author: James Gallagher <jgallagher@opendap.org>
|
|
Packit |
a4aae4 |
//
|
|
Packit |
a4aae4 |
// This library is free software; you can redistribute it and/or
|
|
Packit |
a4aae4 |
// modify it under the terms of the GNU Lesser General Public
|
|
Packit |
a4aae4 |
// License as published by the Free Software Foundation; either
|
|
Packit |
a4aae4 |
// version 2.1 of the License, or (at your option) any later version.
|
|
Packit |
a4aae4 |
//
|
|
Packit |
a4aae4 |
// This library is distributed in the hope that it will be useful,
|
|
Packit |
a4aae4 |
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit |
a4aae4 |
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Packit |
a4aae4 |
// Lesser General Public License for more details.
|
|
Packit |
a4aae4 |
//
|
|
Packit |
a4aae4 |
// You should have received a copy of the GNU Lesser General Public
|
|
Packit |
a4aae4 |
// License along with this library; if not, write to the Free Software
|
|
Packit |
a4aae4 |
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
Packit |
a4aae4 |
//
|
|
Packit |
a4aae4 |
// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/*
|
|
Packit |
a4aae4 |
* MarshallerThread.cc
|
|
Packit |
a4aae4 |
*
|
|
Packit |
a4aae4 |
* Created on: Aug 27, 2015
|
|
Packit |
a4aae4 |
* Author: jimg
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#include "config.h"
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#include <pthread.h>
|
|
Packit |
a4aae4 |
#include <sys/time.h>
|
|
Packit |
a4aae4 |
#include <fcntl.h>
|
|
Packit |
a4aae4 |
#include <unistd.h>
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#include <ostream>
|
|
Packit |
a4aae4 |
#include <sstream>
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#include "MarshallerThread.h"
|
|
Packit |
a4aae4 |
#include "Error.h"
|
|
Packit |
a4aae4 |
#include "InternalErr.h"
|
|
Packit |
a4aae4 |
#include "debug.h"
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
using namespace libdap;
|
|
Packit |
a4aae4 |
using namespace std;
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#if 0
|
|
Packit |
a4aae4 |
bool MarshallerThread::print_time = false;
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Use this with timeval structures returned by gettimeofday() to compute
|
|
Packit |
a4aae4 |
* real time (instead of user time that is returned by std::clock() or
|
|
Packit |
a4aae4 |
* get_rusage()).
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start)
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
/* Perform the carry for the later subtraction by updating y. */
|
|
Packit |
a4aae4 |
if (stop->tv_usec < start->tv_usec) {
|
|
Packit |
a4aae4 |
int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
|
|
Packit |
a4aae4 |
start->tv_usec -= 1000000 * nsec;
|
|
Packit |
a4aae4 |
start->tv_sec += nsec;
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
if (stop->tv_usec - start->tv_usec > 1000000) {
|
|
Packit |
a4aae4 |
int nsec = (start->tv_usec - stop->tv_usec) / 1000000;
|
|
Packit |
a4aae4 |
start->tv_usec += 1000000 * nsec;
|
|
Packit |
a4aae4 |
start->tv_sec -= nsec;
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
double result = stop->tv_sec - start->tv_sec;
|
|
Packit |
a4aae4 |
result += double(stop->tv_usec - start->tv_usec) / 1000000;
|
|
Packit |
a4aae4 |
return result;
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
#endif
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Lock the mutex then wait for the child thread to signal using the
|
|
Packit |
a4aae4 |
* condition variable 'cond'. Once the signal is received, re-test count
|
|
Packit |
a4aae4 |
* to make sure it's zero (there are no child threads).
|
|
Packit |
a4aae4 |
*
|
|
Packit |
a4aae4 |
* This is used to lock the main thread and ensure that a second child
|
|
Packit |
a4aae4 |
* (writer) thread is not started until any current child thread completes,
|
|
Packit |
a4aae4 |
* which keeps the write operations in the correct order.
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
|
|
Packit |
a4aae4 |
m_mutex(lock)
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
int status = pthread_mutex_lock(&m_mutex);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
DBG(cerr << "Locking the mutex! (waiting; " << pthread_self() << ")" << endl);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
|
|
Packit |
a4aae4 |
while (count != 0) {
|
|
Packit |
a4aae4 |
status = pthread_cond_wait(&cond, &m_mutex);
|
|
Packit |
a4aae4 |
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
if (count != 0) throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Unlock the mutex
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
Locker::~Locker()
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
int status = pthread_mutex_unlock(&m_mutex);
|
|
Packit |
a4aae4 |
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Lock the mutex, but do not wait on the condition variable.
|
|
Packit |
a4aae4 |
* This is used by the child thread; it helps ensure that the
|
|
Packit |
a4aae4 |
* mutex is unlocked and the predicate is reset no matter how the
|
|
Packit |
a4aae4 |
* child thread is exited.
|
|
Packit |
a4aae4 |
*
|
|
Packit |
a4aae4 |
* Note we how a reference to the shared 'count' predicate that
|
|
Packit |
a4aae4 |
* tells how many (0 or 1) child threads exist so that when this
|
|
Packit |
a4aae4 |
* version of the Locker object is destroyed, we can zero that.
|
|
Packit |
a4aae4 |
* This enables us to use RAII in the child thread and ensure
|
|
Packit |
a4aae4 |
* the invariant if there is an error and the code exits with a
|
|
Packit |
a4aae4 |
* summary return.
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
|
|
Packit |
a4aae4 |
m_mutex(lock), m_cond(cond), m_count(count)
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
int status = pthread_mutex_lock(&m_mutex);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
ChildLocker::~ChildLocker()
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
m_count = 0;
|
|
Packit |
a4aae4 |
int status = pthread_cond_signal(&m_cond);
|
|
Packit |
a4aae4 |
if (status != 0)
|
|
Packit |
a4aae4 |
throw InternalErr(__FILE__, __LINE__, "Could not signal main thread from ChildLocker!");
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
status = pthread_mutex_unlock(&m_mutex);
|
|
Packit |
a4aae4 |
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
MarshallerThread::MarshallerThread() :
|
|
Packit |
a4aae4 |
d_thread(0), d_child_thread_count(0)
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
if (pthread_attr_init(&d_thread_attr) != 0) throw Error(internal_error, "Failed to initialize pthread attributes.");
|
|
Packit |
a4aae4 |
if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0)
|
|
Packit |
a4aae4 |
throw Error(internal_error, "Failed to complete pthread attribute initialization.");
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
if (pthread_mutex_init(&d_out_mutex, 0) != 0) throw Error(internal_error, "Failed to initialize mutex.");
|
|
Packit |
a4aae4 |
if (pthread_cond_init(&d_out_cond, 0) != 0) throw Error(internal_error, "Failed to initialize cond.");
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
MarshallerThread::~MarshallerThread()
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
int status = pthread_mutex_lock(&d_out_mutex);
|
|
Packit |
a4aae4 |
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
|
|
Packit |
a4aae4 |
while (d_child_thread_count != 0) {
|
|
Packit |
a4aae4 |
status = pthread_cond_wait(&d_out_cond, &d_out_mutex);
|
|
Packit |
a4aae4 |
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
if (d_child_thread_count != 0)
|
|
Packit |
a4aae4 |
throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
status = pthread_mutex_unlock(&d_out_mutex);
|
|
Packit |
a4aae4 |
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
pthread_mutex_destroy(&d_out_mutex);
|
|
Packit |
a4aae4 |
pthread_cond_destroy(&d_out_cond);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
pthread_attr_destroy(&d_thread_attr);
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
// not a static method
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Start the child thread, using the arguments given. This will write 'bytes'
|
|
Packit |
a4aae4 |
* bytes from 'byte_buf' to the output stream 'out'
|
|
Packit |
a4aae4 |
*
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf,
|
|
Packit |
a4aae4 |
unsigned int bytes)
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf,
|
|
Packit |
a4aae4 |
bytes);
|
|
Packit |
a4aae4 |
int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
|
|
Packit |
a4aae4 |
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Write 'bytes' bytes from 'byte_buf' to the file descriptor 'fd'.
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes)
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf,
|
|
Packit |
a4aae4 |
bytes);
|
|
Packit |
a4aae4 |
int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
|
|
Packit |
a4aae4 |
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* This static method is used to write data to the ostream referenced
|
|
Packit |
a4aae4 |
* by the ostream element of write_args. This is used by start_thread()
|
|
Packit |
a4aae4 |
* and passed to pthread_create()
|
|
Packit |
a4aae4 |
*
|
|
Packit |
a4aae4 |
* @note The write_args argument may contain either a file descriptor
|
|
Packit |
a4aae4 |
* (d_out_file) or an ostream& (d_out). If the file descriptor is not
|
|
Packit |
a4aae4 |
* -1, then use that, else use the ostream reference.
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
void *
|
|
Packit |
a4aae4 |
MarshallerThread::write_thread(void *arg)
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
write_args *args = reinterpret_cast<write_args *>(arg);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#if 0
|
|
Packit |
a4aae4 |
struct timeval tp_s;
|
|
Packit |
a4aae4 |
if (print_time && gettimeofday(&tp_s, 0) != 0) cerr << "could not read time" << endl;
|
|
Packit |
a4aae4 |
#endif
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
// force an error
|
|
Packit |
a4aae4 |
// return (void*)-1;
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
if (args->d_out_file != -1) {
|
|
Packit |
a4aae4 |
int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
|
|
Packit |
a4aae4 |
if (bytes_written != args->d_num)
|
|
Packit |
a4aae4 |
return (void*) -1;
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
else {
|
|
Packit |
a4aae4 |
args->d_out.write(args->d_buf, args->d_num);
|
|
Packit |
a4aae4 |
if (args->d_out.fail()) {
|
|
Packit |
a4aae4 |
ostringstream oss;
|
|
Packit |
a4aae4 |
oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
|
|
Packit |
a4aae4 |
args->d_error = oss.str();
|
|
Packit |
a4aae4 |
return (void*) -1;
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
delete [] args->d_buf;
|
|
Packit |
a4aae4 |
delete args;
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#if 0
|
|
Packit |
a4aae4 |
struct timeval tp_e;
|
|
Packit |
a4aae4 |
if (print_time) {
|
|
Packit |
a4aae4 |
if (gettimeofday(&tp_e, 0) != 0) cerr << "could not read time" << endl;
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
#endif
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
return 0;
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* This static method is used to write data to the ostream referenced
|
|
Packit |
a4aae4 |
* by the ostream element of write_args. This is used by start_thread()
|
|
Packit |
a4aae4 |
* and passed to pthread_create()
|
|
Packit |
a4aae4 |
*
|
|
Packit |
a4aae4 |
* @note This differers from MarshallerThread::write_thread() in that it
|
|
Packit |
a4aae4 |
* writes data starting _after_ the four-byte length prefix that XDR
|
|
Packit |
a4aae4 |
* adds to the data. It is used for the put_vector_part() calls in
|
|
Packit |
a4aae4 |
* XDRStreamMarshaller.
|
|
Packit |
a4aae4 |
*
|
|
Packit |
a4aae4 |
* @return 0 if successful, -1 otherwise.
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
void *
|
|
Packit |
a4aae4 |
MarshallerThread::write_thread_part(void *arg)
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
write_args *args = reinterpret_cast<write_args *>(arg);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
if (args->d_out_file != -1) {
|
|
Packit |
a4aae4 |
int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
|
|
Packit |
a4aae4 |
if (bytes_written != args->d_num) return (void*) -1;
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
else {
|
|
Packit |
a4aae4 |
args->d_out.write(args->d_buf + 4, args->d_num);
|
|
Packit |
a4aae4 |
if (args->d_out.fail()) {
|
|
Packit |
a4aae4 |
ostringstream oss;
|
|
Packit |
a4aae4 |
oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
|
|
Packit |
a4aae4 |
args->d_error = oss.str();
|
|
Packit |
a4aae4 |
return (void*) -1;
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
delete [] args->d_buf;
|
|
Packit |
a4aae4 |
delete args;
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
return 0;
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|