|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
// -*- mode: c++; c-basic-offset:4 -*-
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
// This file is part of libdap, A C++ implementation of the OPeNDAP Data
|
|
Packit |
a4aae4 |
// Access Protocol.
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
// Copyright (c) 2015 OPeNDAP, Inc.
|
|
Packit |
a4aae4 |
// Author: James Gallagher <jgallagher@opendap.org>
|
|
Packit |
a4aae4 |
//
|
|
Packit |
a4aae4 |
// This library is free software; you can redistribute it and/or
|
|
Packit |
a4aae4 |
// modify it under the terms of the GNU Lesser General Public
|
|
Packit |
a4aae4 |
// License as published by the Free Software Foundation; either
|
|
Packit |
a4aae4 |
// version 2.1 of the License, or (at your option) any later version.
|
|
Packit |
a4aae4 |
//
|
|
Packit |
a4aae4 |
// This library is distributed in the hope that it will be useful,
|
|
Packit |
a4aae4 |
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit |
a4aae4 |
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Packit |
a4aae4 |
// Lesser General Public License for more details.
|
|
Packit |
a4aae4 |
//
|
|
Packit |
a4aae4 |
// You should have received a copy of the GNU Lesser General Public
|
|
Packit |
a4aae4 |
// License along with this library; if not, write to the Free Software
|
|
Packit |
a4aae4 |
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
Packit |
a4aae4 |
//
|
|
Packit |
a4aae4 |
// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/*
|
|
Packit |
a4aae4 |
* MarshallerThread.h
|
|
Packit |
a4aae4 |
*
|
|
Packit |
a4aae4 |
* Created on: Aug 27, 2015
|
|
Packit |
a4aae4 |
* Author: jimg
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#ifndef MARSHALLERTHREAD_H_
|
|
Packit |
a4aae4 |
#define MARSHALLERTHREAD_H_
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#include <pthread.h>
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#include <iostream>
|
|
Packit |
a4aae4 |
#include <ostream>
|
|
Packit |
a4aae4 |
#include <string>
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
namespace libdap {
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* RAII for the MarshallerThread mutex and condition variable. Used by the
|
|
Packit |
a4aae4 |
* Main thread. The constructor locks the mutex and then, if the count of
|
|
Packit |
a4aae4 |
* child threads is not zero, blocks on the associated condition variable.
|
|
Packit |
a4aae4 |
* When signaled by the child thread using the condition variable (the child
|
|
Packit |
a4aae4 |
* thread count should then be zero), the mutex is (re)locked and the ctor
|
|
Packit |
a4aae4 |
* returns. The destructor unlocks the mutex.
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
class Locker {
|
|
Packit |
a4aae4 |
public:
|
|
Packit |
a4aae4 |
Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count);
|
|
Packit |
a4aae4 |
virtual ~Locker();
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
private:
|
|
Packit |
a4aae4 |
pthread_mutex_t& m_mutex;
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
Locker();
|
|
Packit |
a4aae4 |
Locker(const Locker &rhs;;
|
|
Packit |
a4aae4 |
};
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Synchronization for the child thread in the multi-threaded version of
|
|
Packit |
a4aae4 |
* the DAP2 and DAP4 (when it gets implement) 'Marshaller' class used to
|
|
Packit |
a4aae4 |
* send data. The class declared below (MarshallerThread) manages the
|
|
Packit |
a4aae4 |
* child thread.
|
|
Packit |
a4aae4 |
*
|
|
Packit |
a4aae4 |
* The ctor of this class simply
|
|
Packit |
a4aae4 |
* locks the mutex; the dtor clears the child thread count, signals that
|
|
Packit |
a4aae4 |
* count has changed and unlocks the mutex.
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
class ChildLocker {
|
|
Packit |
a4aae4 |
public:
|
|
Packit |
a4aae4 |
ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count);
|
|
Packit |
a4aae4 |
virtual ~ChildLocker();
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
private:
|
|
Packit |
a4aae4 |
pthread_mutex_t& m_mutex;
|
|
Packit |
a4aae4 |
pthread_cond_t& m_cond;
|
|
Packit |
a4aae4 |
int& m_count;
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
ChildLocker();
|
|
Packit |
a4aae4 |
ChildLocker(const Locker &rhs;;
|
|
Packit |
a4aae4 |
};
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Implement a multi-threaded data transmission sub-system for libdap.
|
|
Packit |
a4aae4 |
* This class makes it fairly painless to send data using a child thread
|
|
Packit |
a4aae4 |
* so that the main thread can be used to read the next chunk of data
|
|
Packit |
a4aae4 |
* while whatever has been read to this point is sent over the wire.
|
|
Packit |
a4aae4 |
*
|
|
Packit |
a4aae4 |
* This code is used by XDRStreamMarshaller and (soon) D4StreamMarshaller.
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
class MarshallerThread {
|
|
Packit |
a4aae4 |
private:
|
|
Packit |
a4aae4 |
pthread_t d_thread;
|
|
Packit |
a4aae4 |
pthread_attr_t d_thread_attr;
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
pthread_mutex_t d_out_mutex;
|
|
Packit |
a4aae4 |
pthread_cond_t d_out_cond;
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
int d_child_thread_count; // 0 or 1
|
|
Packit |
a4aae4 |
std::string d_thread_error; // non-null indicates an error
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#if 0
|
|
Packit |
a4aae4 |
static bool print_time; // false by default
|
|
Packit |
a4aae4 |
#endif
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Used to pass information into the static methods that run the
|
|
Packit |
a4aae4 |
* simple stream writer threads. This can pass both an ostream or
|
|
Packit |
a4aae4 |
* a file descriptor. If a fd is passed, the ostream reference is
|
|
Packit |
a4aae4 |
* set to stderr (i.e., std::cerr).
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
struct write_args {
|
|
Packit |
a4aae4 |
pthread_mutex_t &d_mutex;
|
|
Packit |
a4aae4 |
pthread_cond_t &d_cond;
|
|
Packit |
a4aae4 |
int &d_count;
|
|
Packit |
a4aae4 |
std::string &d_error;
|
|
Packit |
a4aae4 |
std::ostream &d_out; // The output stream protected by the mutex, ...
|
|
Packit |
a4aae4 |
int d_out_file; // file descriptor; if not -1, use this.
|
|
Packit |
a4aae4 |
char *d_buf; // The data to write to the stream
|
|
Packit |
a4aae4 |
int d_num; // The size of d_buf
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Build args for an ostream. The file descriptor is set to -1
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, std::ostream &s, char *vals, int num) :
|
|
Packit |
a4aae4 |
d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(s), d_out_file(-1), d_buf(vals), d_num(num)
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
/**
|
|
Packit |
a4aae4 |
* Build args for a file descriptr. The ostream is set to cerr (because it is
|
|
Packit |
a4aae4 |
* a reference and has to be initialized to something).
|
|
Packit |
a4aae4 |
*/
|
|
Packit |
a4aae4 |
write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, int fd, char *vals, int num) :
|
|
Packit |
a4aae4 |
d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(std::cerr), d_out_file(fd), d_buf(vals), d_num(num)
|
|
Packit |
a4aae4 |
{
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
};
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
public:
|
|
Packit |
a4aae4 |
MarshallerThread();
|
|
Packit |
a4aae4 |
virtual ~MarshallerThread();
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
pthread_mutex_t &get_mutex() { return d_out_mutex; }
|
|
Packit |
a4aae4 |
pthread_cond_t &get_cond() { return d_out_cond; }
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
int &get_child_thread_count() { return d_child_thread_count; }
|
|
Packit |
a4aae4 |
void increment_child_thread_count() { ++d_child_thread_count; }
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
void start_thread(void* (*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written);
|
|
Packit |
a4aae4 |
void start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes_written);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
// These are static so they will have c-linkage - required because they
|
|
Packit |
a4aae4 |
// are passed to pthread_create()
|
|
Packit |
a4aae4 |
static void *write_thread(void *arg);
|
|
Packit |
a4aae4 |
static void *write_thread_part(void *arg);
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#if 0
|
|
Packit |
a4aae4 |
static void set_print_time(bool state) { print_time = state; }
|
|
Packit |
a4aae4 |
static bool get_print_time() { return print_time; }
|
|
Packit |
a4aae4 |
#endif
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
};
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
}
|
|
Packit |
a4aae4 |
|
|
Packit |
a4aae4 |
#endif /* MARSHALLERTHREAD_H_ */
|