Blame kcthread.h

Packit Service ac2942
/*************************************************************************************************
Packit Service ac2942
 * Threading devices
Packit Service ac2942
 *                                                               Copyright (C) 2009-2012 FAL Labs
Packit Service ac2942
 * This file is part of Kyoto Cabinet.
Packit Service ac2942
 * This program is free software: you can redistribute it and/or modify it under the terms of
Packit Service ac2942
 * the GNU General Public License as published by the Free Software Foundation, either version
Packit Service ac2942
 * 3 of the License, or any later version.
Packit Service ac2942
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
Packit Service ac2942
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
Packit Service ac2942
 * See the GNU General Public License for more details.
Packit Service ac2942
 * You should have received a copy of the GNU General Public License along with this program.
Packit Service ac2942
 * If not, see <http://www.gnu.org/licenses/>.
Packit Service ac2942
 *************************************************************************************************/
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
#ifndef _KCTHREAD_H                      // duplication check
Packit Service ac2942
#define _KCTHREAD_H
Packit Service ac2942
Packit Service ac2942
#include <kccommon.h>
Packit Service ac2942
#include <kcutil.h>
Packit Service ac2942
Packit Service ac2942
namespace kyotocabinet {                 // common namespace
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Threading device.
Packit Service ac2942
 */
Packit Service ac2942
class Thread {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Default constructor.
Packit Service ac2942
   */
Packit Service ac2942
  explicit Thread();
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  virtual ~Thread();
Packit Service ac2942
  /**
Packit Service ac2942
   * Perform the concrete process.
Packit Service ac2942
   */
Packit Service ac2942
  virtual void run() = 0;
Packit Service ac2942
  /**
Packit Service ac2942
   * Start the thread.
Packit Service ac2942
   */
Packit Service ac2942
  void start();
Packit Service ac2942
  /**
Packit Service ac2942
   * Wait for the thread to finish.
Packit Service ac2942
   */
Packit Service ac2942
  void join();
Packit Service ac2942
  /**
Packit Service ac2942
   * Put the thread in the detached state.
Packit Service ac2942
   */
Packit Service ac2942
  void detach();
Packit Service ac2942
  /**
Packit Service ac2942
   * Terminate the running thread.
Packit Service ac2942
   */
Packit Service ac2942
  static void exit();
Packit Service ac2942
  /**
Packit Service ac2942
   * Yield the processor from the current thread.
Packit Service ac2942
   */
Packit Service ac2942
  static void yield();
Packit Service ac2942
  /**
Packit Service ac2942
   * Chill the processor by suspending execution for a quick moment.
Packit Service ac2942
   */
Packit Service ac2942
  static void chill();
Packit Service ac2942
  /**
Packit Service ac2942
   * Suspend execution of the current thread.
Packit Service ac2942
   * @param sec the interval of the suspension in seconds.
Packit Service ac2942
   * @return true on success, or false on failure.
Packit Service ac2942
   */
Packit Service ac2942
  static bool sleep(double sec);
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the hash value of the current thread.
Packit Service ac2942
   * @return the hash value of the current thread.
Packit Service ac2942
   */
Packit Service ac2942
  static int64_t hash();
Packit Service ac2942
 private:
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  Thread(const Thread&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  Thread& operator =(const Thread&);
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Basic mutual exclusion device.
Packit Service ac2942
 */
Packit Service ac2942
class Mutex {
Packit Service ac2942
  friend class CondVar;
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Type of the behavior for double locking.
Packit Service ac2942
   */
Packit Service ac2942
  enum Type {
Packit Service ac2942
    FAST,                ///< no operation
Packit Service ac2942
    ERRORCHECK,          ///< check error
Packit Service ac2942
    RECURSIVE            ///< allow recursive locking
Packit Service ac2942
  };
Packit Service ac2942
  /**
Packit Service ac2942
   * Default constructor.
Packit Service ac2942
   */
Packit Service ac2942
  explicit Mutex();
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param type the behavior for double locking.
Packit Service ac2942
   */
Packit Service ac2942
  explicit Mutex(Type type);
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~Mutex();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the lock.
Packit Service ac2942
   */
Packit Service ac2942
  void lock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Try to get the lock.
Packit Service ac2942
   * @return true on success, or false on failure.
Packit Service ac2942
   */
Packit Service ac2942
  bool lock_try();
Packit Service ac2942
  /**
Packit Service ac2942
   * Try to get the lock.
Packit Service ac2942
   * @param sec the interval of the suspension in seconds.
Packit Service ac2942
   * @return true on success, or false on failure.
Packit Service ac2942
   */
Packit Service ac2942
  bool lock_try(double sec);
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the lock.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock();
Packit Service ac2942
 private:
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  Mutex(const Mutex&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  Mutex& operator =(const Mutex&);
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Scoped mutex device.
Packit Service ac2942
 */
Packit Service ac2942
class ScopedMutex {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param mutex a mutex to lock the block.
Packit Service ac2942
   */
Packit Service ac2942
  explicit ScopedMutex(Mutex* mutex) : mutex_(mutex) {
Packit Service ac2942
    _assert_(mutex);
Packit Service ac2942
    mutex_->lock();
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~ScopedMutex() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    mutex_->unlock();
Packit Service ac2942
  }
Packit Service ac2942
 private:
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  ScopedMutex(const ScopedMutex&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  ScopedMutex& operator =(const ScopedMutex&);
Packit Service ac2942
  /** The inner device. */
Packit Service ac2942
  Mutex* mutex_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Slotted mutex device.
Packit Service ac2942
 */
Packit Service ac2942
class SlottedMutex {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param slotnum the number of slots.
Packit Service ac2942
   */
Packit Service ac2942
  explicit SlottedMutex(size_t slotnum);
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~SlottedMutex();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the lock of a slot.
Packit Service ac2942
   * @param idx the index of a slot.
Packit Service ac2942
   */
Packit Service ac2942
  void lock(size_t idx);
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the lock of a slot.
Packit Service ac2942
   * @param idx the index of a slot.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock(size_t idx);
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the locks of all slots.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_all();
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the locks of all slots.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock_all();
Packit Service ac2942
 private:
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Lightweight mutual exclusion device.
Packit Service ac2942
 */
Packit Service ac2942
class SpinLock {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Default constructor.
Packit Service ac2942
   */
Packit Service ac2942
  explicit SpinLock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~SpinLock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the lock.
Packit Service ac2942
   */
Packit Service ac2942
  void lock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Try to get the lock.
Packit Service ac2942
   * @return true on success, or false on failure.
Packit Service ac2942
   */
Packit Service ac2942
  bool lock_try();
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the lock.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock();
Packit Service ac2942
 private:
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  SpinLock(const SpinLock&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  SpinLock& operator =(const SpinLock&);
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Scoped spin lock device.
Packit Service ac2942
 */
Packit Service ac2942
class ScopedSpinLock {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param spinlock a spin lock to lock the block.
Packit Service ac2942
   */
Packit Service ac2942
  explicit ScopedSpinLock(SpinLock* spinlock) : spinlock_(spinlock) {
Packit Service ac2942
    _assert_(spinlock);
Packit Service ac2942
    spinlock_->lock();
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~ScopedSpinLock() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    spinlock_->unlock();
Packit Service ac2942
  }
Packit Service ac2942
 private:
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  ScopedSpinLock(const ScopedSpinLock&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  ScopedSpinLock& operator =(const ScopedSpinLock&);
Packit Service ac2942
  /** The inner device. */
Packit Service ac2942
  SpinLock* spinlock_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Slotted spin lock devices.
Packit Service ac2942
 */
Packit Service ac2942
class SlottedSpinLock {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param slotnum the number of slots.
Packit Service ac2942
   */
Packit Service ac2942
  explicit SlottedSpinLock(size_t slotnum);
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~SlottedSpinLock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the lock of a slot.
Packit Service ac2942
   * @param idx the index of a slot.
Packit Service ac2942
   */
Packit Service ac2942
  void lock(size_t idx);
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the lock of a slot.
Packit Service ac2942
   * @param idx the index of a slot.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock(size_t idx);
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the locks of all slots.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_all();
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the locks of all slots.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock_all();
Packit Service ac2942
 private:
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Reader-writer locking device.
Packit Service ac2942
 */
Packit Service ac2942
class RWLock {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Default constructor.
Packit Service ac2942
   */
Packit Service ac2942
  explicit RWLock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~RWLock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the writer lock.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_writer();
Packit Service ac2942
  /**
Packit Service ac2942
   * Try to get the writer lock.
Packit Service ac2942
   * @return true on success, or false on failure.
Packit Service ac2942
   */
Packit Service ac2942
  bool lock_writer_try();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get a reader lock.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_reader();
Packit Service ac2942
  /**
Packit Service ac2942
   * Try to get a reader lock.
Packit Service ac2942
   * @return true on success, or false on failure.
Packit Service ac2942
   */
Packit Service ac2942
  bool lock_reader_try();
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the lock.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock();
Packit Service ac2942
 private:
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  RWLock(const RWLock&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  RWLock& operator =(const RWLock&);
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Scoped reader-writer locking device.
Packit Service ac2942
 */
Packit Service ac2942
class ScopedRWLock {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param rwlock a rwlock to lock the block.
Packit Service ac2942
   * @param writer true for writer lock, or false for reader lock.
Packit Service ac2942
   */
Packit Service ac2942
  explicit ScopedRWLock(RWLock* rwlock, bool writer) : rwlock_(rwlock) {
Packit Service ac2942
    _assert_(rwlock);
Packit Service ac2942
    if (writer) {
Packit Service ac2942
      rwlock_->lock_writer();
Packit Service ac2942
    } else {
Packit Service ac2942
      rwlock_->lock_reader();
Packit Service ac2942
    }
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~ScopedRWLock() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    rwlock_->unlock();
Packit Service ac2942
  }
Packit Service ac2942
 private:
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  ScopedRWLock(const ScopedRWLock&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  ScopedRWLock& operator =(const ScopedRWLock&);
Packit Service ac2942
  /** The inner device. */
Packit Service ac2942
  RWLock* rwlock_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Slotted reader-writer lock devices.
Packit Service ac2942
 */
Packit Service ac2942
class SlottedRWLock {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param slotnum the number of slots.
Packit Service ac2942
   */
Packit Service ac2942
  explicit SlottedRWLock(size_t slotnum);
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~SlottedRWLock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the writer lock of a slot.
Packit Service ac2942
   * @param idx the index of a slot.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_writer(size_t idx);
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the reader lock of a slot.
Packit Service ac2942
   * @param idx the index of a slot.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_reader(size_t idx);
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the lock of a slot.
Packit Service ac2942
   * @param idx the index of a slot.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock(size_t idx);
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the writer locks of all slots.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_writer_all();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the reader locks of all slots.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_reader_all();
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the locks of all slots.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock_all();
Packit Service ac2942
 private:
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Lightweight reader-writer locking device.
Packit Service ac2942
 */
Packit Service ac2942
class SpinRWLock {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Default constructor.
Packit Service ac2942
   */
Packit Service ac2942
  explicit SpinRWLock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~SpinRWLock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the writer lock.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_writer();
Packit Service ac2942
  /**
Packit Service ac2942
   * Try to get the writer lock.
Packit Service ac2942
   * @return true on success, or false on failure.
Packit Service ac2942
   */
Packit Service ac2942
  bool lock_writer_try();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get a reader lock.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_reader();
Packit Service ac2942
  /**
Packit Service ac2942
   * Try to get a reader lock.
Packit Service ac2942
   * @return true on success, or false on failure.
Packit Service ac2942
   */
Packit Service ac2942
  bool lock_reader_try();
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the lock.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Promote a reader lock to the writer lock.
Packit Service ac2942
   * @return true on success, or false on failure.
Packit Service ac2942
   */
Packit Service ac2942
  bool promote();
Packit Service ac2942
  /**
Packit Service ac2942
   * Demote the writer lock to a reader lock.
Packit Service ac2942
   */
Packit Service ac2942
  void demote();
Packit Service ac2942
 private:
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  SpinRWLock(const SpinRWLock&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  SpinRWLock& operator =(const SpinRWLock&);
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Scoped reader-writer locking device.
Packit Service ac2942
 */
Packit Service ac2942
class ScopedSpinRWLock {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param srwlock a spin rwlock to lock the block.
Packit Service ac2942
   * @param writer true for writer lock, or false for reader lock.
Packit Service ac2942
   */
Packit Service ac2942
  explicit ScopedSpinRWLock(SpinRWLock* srwlock, bool writer) : srwlock_(srwlock) {
Packit Service ac2942
    _assert_(srwlock);
Packit Service ac2942
    if (writer) {
Packit Service ac2942
      srwlock_->lock_writer();
Packit Service ac2942
    } else {
Packit Service ac2942
      srwlock_->lock_reader();
Packit Service ac2942
    }
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~ScopedSpinRWLock() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    srwlock_->unlock();
Packit Service ac2942
  }
Packit Service ac2942
 private:
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  ScopedSpinRWLock(const ScopedSpinRWLock&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  ScopedSpinRWLock& operator =(const ScopedSpinRWLock&);
Packit Service ac2942
  /** The inner device. */
Packit Service ac2942
  SpinRWLock* srwlock_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Slotted lightweight reader-writer lock devices.
Packit Service ac2942
 */
Packit Service ac2942
class SlottedSpinRWLock {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param slotnum the number of slots.
Packit Service ac2942
   */
Packit Service ac2942
  explicit SlottedSpinRWLock(size_t slotnum);
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~SlottedSpinRWLock();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the writer lock of a slot.
Packit Service ac2942
   * @param idx the index of a slot.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_writer(size_t idx);
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the reader lock of a slot.
Packit Service ac2942
   * @param idx the index of a slot.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_reader(size_t idx);
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the lock of a slot.
Packit Service ac2942
   * @param idx the index of a slot.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock(size_t idx);
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the writer locks of all slots.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_writer_all();
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the reader locks of all slots.
Packit Service ac2942
   */
Packit Service ac2942
  void lock_reader_all();
Packit Service ac2942
  /**
Packit Service ac2942
   * Release the locks of all slots.
Packit Service ac2942
   */
Packit Service ac2942
  void unlock_all();
Packit Service ac2942
 private:
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Condition variable.
Packit Service ac2942
 */
Packit Service ac2942
class CondVar {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Default constructor.
Packit Service ac2942
   */
Packit Service ac2942
  explicit CondVar();
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~CondVar();
Packit Service ac2942
  /**
Packit Service ac2942
   * Wait for the signal.
Packit Service ac2942
   * @param mutex a locked mutex.
Packit Service ac2942
   */
Packit Service ac2942
  void wait(Mutex* mutex);
Packit Service ac2942
  /**
Packit Service ac2942
   * Wait for the signal.
Packit Service ac2942
   * @param mutex a locked mutex.
Packit Service ac2942
   * @param sec the interval of the suspension in seconds.
Packit Service ac2942
   * @return true on catched signal, or false on timeout.
Packit Service ac2942
   */
Packit Service ac2942
  bool wait(Mutex* mutex, double sec);
Packit Service ac2942
  /**
Packit Service ac2942
   * Send the wake-up signal to another waiting thread.
Packit Service ac2942
   * @note The mutex used for the wait method should be locked by the caller.
Packit Service ac2942
   */
Packit Service ac2942
  void signal();
Packit Service ac2942
  /**
Packit Service ac2942
   * Send the wake-up signals to all waiting threads.
Packit Service ac2942
   * @note The mutex used for the wait method should be locked by the caller.
Packit Service ac2942
   */
Packit Service ac2942
  void broadcast();
Packit Service ac2942
 private:
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  CondVar(const CondVar&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  CondVar& operator =(const CondVar&);
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Assosiative condition variable.
Packit Service ac2942
 */
Packit Service ac2942
class CondMap {
Packit Service ac2942
 private:
Packit Service ac2942
  struct Count;
Packit Service ac2942
  struct Slot;
Packit Service ac2942
  /** An alias of set of counters. */
Packit Service ac2942
  typedef std::map<std::string, Count> CountMap;
Packit Service ac2942
  /** The number of slots. */
Packit Service ac2942
  static const size_t SLOTNUM = 64;
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Default constructor.
Packit Service ac2942
   */
Packit Service ac2942
  explicit CondMap() : slots_() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~CondMap() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Wait for a signal.
Packit Service ac2942
   * @param kbuf the pointer to the key region.
Packit Service ac2942
   * @param ksiz the size of the key region.
Packit Service ac2942
   * @param sec the interval of the suspension in seconds.  If it is negative, no timeout is
Packit Service ac2942
   * specified.
Packit Service ac2942
   * @return true on catched signal, or false on timeout.
Packit Service ac2942
   */
Packit Service ac2942
  bool wait(const char* kbuf, size_t ksiz, double sec = -1) {
Packit Service ac2942
    _assert_(kbuf && ksiz <= MEMMAXSIZ);
Packit Service ac2942
    std::string key(kbuf, ksiz);
Packit Service ac2942
    return wait(key, sec);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Wait for a signal by a key.
Packit Service ac2942
   * @param key the key.
Packit Service ac2942
   * @param sec the interval of the suspension in seconds.  If it is negative, no timeout is
Packit Service ac2942
   * specified.
Packit Service ac2942
   * @return true on catched signal, or false on timeout.
Packit Service ac2942
   */
Packit Service ac2942
  bool wait(const std::string& key, double sec = -1) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    double invtime = sec < 0 ? 1.0 : sec;
Packit Service ac2942
    double curtime = time();
Packit Service ac2942
    double endtime = curtime + (sec < 0 ? UINT32MAX : sec);
Packit Service ac2942
    Slot* slot = get_slot(key);
Packit Service ac2942
    while (curtime < endtime) {
Packit Service ac2942
      ScopedMutex lock(&slot->mutex);
Packit Service ac2942
      CountMap::iterator cit = slot->counter.find(key);
Packit Service ac2942
      if (cit == slot->counter.end()) {
Packit Service ac2942
        Count cnt = { 1, false };
Packit Service ac2942
        slot->counter[key] = cnt;
Packit Service ac2942
      } else {
Packit Service ac2942
        cit->second.num++;
Packit Service ac2942
      }
Packit Service ac2942
      slot->cond.wait(&slot->mutex, invtime);
Packit Service ac2942
      cit = slot->counter.find(key);
Packit Service ac2942
      cit->second.num--;
Packit Service ac2942
      if (cit->second.wake > 0) {
Packit Service ac2942
        cit->second.wake--;
Packit Service ac2942
        if (cit->second.num < 1) slot->counter.erase(cit);
Packit Service ac2942
        return true;
Packit Service ac2942
      }
Packit Service ac2942
      if (cit->second.num < 1) slot->counter.erase(cit);
Packit Service ac2942
      curtime = time();
Packit Service ac2942
    }
Packit Service ac2942
    return false;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Send a wake-up signal to another thread waiting by a key.
Packit Service ac2942
   * @param kbuf the pointer to the key region.
Packit Service ac2942
   * @param ksiz the size of the key region.
Packit Service ac2942
   * @return the number of threads waiting for the signal.
Packit Service ac2942
   */
Packit Service ac2942
  size_t signal(const char* kbuf, size_t ksiz) {
Packit Service ac2942
    _assert_(kbuf && ksiz <= MEMMAXSIZ);
Packit Service ac2942
    std::string key(kbuf, ksiz);
Packit Service ac2942
    return signal(key);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Send a wake-up signal to another thread waiting by a key.
Packit Service ac2942
   * @param key the key.
Packit Service ac2942
   * @return the number of threads waiting for the signal.
Packit Service ac2942
   */
Packit Service ac2942
  size_t signal(const std::string& key) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    Slot* slot = get_slot(key);
Packit Service ac2942
    ScopedMutex lock(&slot->mutex);
Packit Service ac2942
    CountMap::iterator cit = slot->counter.find(key);
Packit Service ac2942
    if (cit == slot->counter.end() || cit->second.num < 1) return 0;
Packit Service ac2942
    if (cit->second.wake < cit->second.num) cit->second.wake++;
Packit Service ac2942
    slot->cond.broadcast();
Packit Service ac2942
    return cit->second.num;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Send wake-up signals to all threads waiting by a key.
Packit Service ac2942
   * @param kbuf the pointer to the key region.
Packit Service ac2942
   * @param ksiz the size of the key region.
Packit Service ac2942
   * @return the number of threads waiting for the signal.
Packit Service ac2942
   */
Packit Service ac2942
  size_t broadcast(const char* kbuf, size_t ksiz) {
Packit Service ac2942
    _assert_(kbuf && ksiz <= MEMMAXSIZ);
Packit Service ac2942
    std::string key(kbuf, ksiz);
Packit Service ac2942
    return broadcast(key);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Send wake-up signals to all threads waiting by a key.
Packit Service ac2942
   * @param key the key.
Packit Service ac2942
   * @return the number of threads waiting for the signal.
Packit Service ac2942
   */
Packit Service ac2942
  size_t broadcast(const std::string& key) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    Slot* slot = get_slot(key);
Packit Service ac2942
    ScopedMutex lock(&slot->mutex);
Packit Service ac2942
    CountMap::iterator cit = slot->counter.find(key);
Packit Service ac2942
    if (cit == slot->counter.end() || cit->second.num < 1) return 0;
Packit Service ac2942
    cit->second.wake = cit->second.num;
Packit Service ac2942
    slot->cond.broadcast();
Packit Service ac2942
    return cit->second.num;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Send wake-up signals to all threads waiting by each key.
Packit Service ac2942
   * @return the number of threads waiting for the signal.
Packit Service ac2942
   */
Packit Service ac2942
  size_t broadcast_all() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    size_t sum = 0;
Packit Service ac2942
    for (size_t i = 0; i < SLOTNUM; i++) {
Packit Service ac2942
      Slot* slot = slots_ + i;
Packit Service ac2942
      ScopedMutex lock(&slot->mutex);
Packit Service ac2942
      CountMap::iterator cit = slot->counter.begin();
Packit Service ac2942
      CountMap::iterator citend = slot->counter.end();
Packit Service ac2942
      while (cit != citend) {
Packit Service ac2942
        if (cit->second.num > 0) {
Packit Service ac2942
          cit->second.wake = cit->second.num;
Packit Service ac2942
          sum += cit->second.num;
Packit Service ac2942
        }
Packit Service ac2942
        slot->cond.broadcast();
Packit Service ac2942
        ++cit;
Packit Service ac2942
      }
Packit Service ac2942
    }
Packit Service ac2942
    return sum;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the total number of threads waiting for signals.
Packit Service ac2942
   * @return the total number of threads waiting for signals.
Packit Service ac2942
   */
Packit Service ac2942
  size_t count() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    size_t sum = 0;
Packit Service ac2942
    for (size_t i = 0; i < SLOTNUM; i++) {
Packit Service ac2942
      Slot* slot = slots_ + i;
Packit Service ac2942
      ScopedMutex lock(&slot->mutex);
Packit Service ac2942
      CountMap::iterator cit = slot->counter.begin();
Packit Service ac2942
      CountMap::iterator citend = slot->counter.end();
Packit Service ac2942
      while (cit != citend) {
Packit Service ac2942
        sum += cit->second.num;
Packit Service ac2942
        ++cit;
Packit Service ac2942
      }
Packit Service ac2942
    }
Packit Service ac2942
    return sum;
Packit Service ac2942
  }
Packit Service ac2942
 private:
Packit Service ac2942
  /**
Packit Service ac2942
   * Counter for waiting threads.
Packit Service ac2942
   */
Packit Service ac2942
  struct Count {
Packit Service ac2942
    size_t num;                          ///< waiting threads
Packit Service ac2942
    size_t wake;                         ///< waking threads
Packit Service ac2942
  };
Packit Service ac2942
  /**
Packit Service ac2942
   * Slot of a key space.
Packit Service ac2942
   */
Packit Service ac2942
  struct Slot {
Packit Service ac2942
    CondVar cond;                        ///< condition variable
Packit Service ac2942
    Mutex mutex;                         ///< mutex
Packit Service ac2942
    CountMap counter;                    ///< counter
Packit Service ac2942
  };
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the slot corresponding a key.
Packit Service ac2942
   * @param key the key.
Packit Service ac2942
   * @return the slot corresponding the key.
Packit Service ac2942
   */
Packit Service ac2942
  Slot* get_slot(const std::string& key) {
Packit Service ac2942
    return slots_ + hashmurmur(key.data(), key.size()) % SLOTNUM;
Packit Service ac2942
  }
Packit Service ac2942
  /** The slot array. */
Packit Service ac2942
  Slot slots_[SLOTNUM];
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Key of thread specific data.
Packit Service ac2942
 */
Packit Service ac2942
class TSDKey {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Default constructor.
Packit Service ac2942
   */
Packit Service ac2942
  explicit TSDKey();
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param dstr the destructor for the value.
Packit Service ac2942
   */
Packit Service ac2942
  explicit TSDKey(void (*dstr)(void*));
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~TSDKey();
Packit Service ac2942
  /**
Packit Service ac2942
   * Set the value.
Packit Service ac2942
   * @param ptr an arbitrary pointer.
Packit Service ac2942
   */
Packit Service ac2942
  void set(void* ptr);
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the value.
Packit Service ac2942
   * @return the value.
Packit Service ac2942
   */
Packit Service ac2942
  void* get() const ;
Packit Service ac2942
 private:
Packit Service ac2942
  /** Opaque pointer. */
Packit Service ac2942
  void* opq_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Smart pointer to thread specific data.
Packit Service ac2942
 */
Packit Service ac2942
template <class TYPE>
Packit Service ac2942
class TSD {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Default constructor.
Packit Service ac2942
   */
Packit Service ac2942
  explicit TSD() : key_(delete_value) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~TSD() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    TYPE* obj = (TYPE*)key_.get();
Packit Service ac2942
    if (obj) {
Packit Service ac2942
      delete obj;
Packit Service ac2942
      key_.set(NULL);
Packit Service ac2942
    }
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Dereference operator.
Packit Service ac2942
   * @return the reference to the inner object.
Packit Service ac2942
   */
Packit Service ac2942
  TYPE& operator *() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    TYPE* obj = (TYPE*)key_.get();
Packit Service ac2942
    if (!obj) {
Packit Service ac2942
      obj = new TYPE;
Packit Service ac2942
      key_.set(obj);
Packit Service ac2942
    }
Packit Service ac2942
    return *obj;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Member reference operator.
Packit Service ac2942
   * @return the pointer to the inner object.
Packit Service ac2942
   */
Packit Service ac2942
  TYPE* operator ->() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    TYPE* obj = (TYPE*)key_.get();
Packit Service ac2942
    if (!obj) {
Packit Service ac2942
      obj = new TYPE;
Packit Service ac2942
      key_.set(obj);
Packit Service ac2942
    }
Packit Service ac2942
    return obj;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Cast operator to the original type.
Packit Service ac2942
   * @return the copy of the inner object.
Packit Service ac2942
   */
Packit Service ac2942
  operator TYPE() const {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    TYPE* obj = (TYPE*)key_.get();
Packit Service ac2942
    if (!obj) return TYPE();
Packit Service ac2942
    return *obj;
Packit Service ac2942
  }
Packit Service ac2942
 private:
Packit Service ac2942
  /**
Packit Service ac2942
   * Delete the inner object.
Packit Service ac2942
   * @param obj the inner object.
Packit Service ac2942
   */
Packit Service ac2942
  static void delete_value(void* obj) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    delete (TYPE*)obj;
Packit Service ac2942
  }
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  TSD(const TSD&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  TSD& operator =(const TSD&);
Packit Service ac2942
  /** Key of thread specific data. */
Packit Service ac2942
  TSDKey key_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Integer with atomic operations.
Packit Service ac2942
 */
Packit Service ac2942
class AtomicInt64 {
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Default constructor.
Packit Service ac2942
   */
Packit Service ac2942
  explicit AtomicInt64() : value_(0), lock_() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Copy constructor.
Packit Service ac2942
   * @param src the source object.
Packit Service ac2942
   */
Packit Service ac2942
  AtomicInt64(const AtomicInt64& src) : value_(src.get()), lock_() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  };
Packit Service ac2942
  /**
Packit Service ac2942
   * Constructor.
Packit Service ac2942
   * @param num the initial value.
Packit Service ac2942
   */
Packit Service ac2942
  AtomicInt64(int64_t num) : value_(num), lock_() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  ~AtomicInt64() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Set the new value.
Packit Service ac2942
   * @param val the new value.
Packit Service ac2942
   * @return the old value.
Packit Service ac2942
   */
Packit Service ac2942
  int64_t set(int64_t val);
Packit Service ac2942
  /**
Packit Service ac2942
   * Add a value.
Packit Service ac2942
   * @param val the additional value.
Packit Service ac2942
   * @return the old value.
Packit Service ac2942
   */
Packit Service ac2942
  int64_t add(int64_t val);
Packit Service ac2942
  /**
Packit Service ac2942
   * Perform compare-and-swap.
Packit Service ac2942
   * @param oval the old value.
Packit Service ac2942
   * @param nval the new value.
Packit Service ac2942
   * @return true on success, or false on failure.
Packit Service ac2942
   */
Packit Service ac2942
  bool cas(int64_t oval, int64_t nval);
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the current value.
Packit Service ac2942
   * @return the current value.
Packit Service ac2942
   */
Packit Service ac2942
  int64_t get() const;
Packit Service ac2942
  /**
Packit Service ac2942
   * Assignment operator from the self type.
Packit Service ac2942
   * @param right the right operand.
Packit Service ac2942
   * @return the reference to itself.
Packit Service ac2942
   */
Packit Service ac2942
  AtomicInt64& operator =(const AtomicInt64& right) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    if (&right == this) return *this;
Packit Service ac2942
    set(right.get());
Packit Service ac2942
    return *this;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Assignment operator from integer.
Packit Service ac2942
   * @param right the right operand.
Packit Service ac2942
   * @return the reference to itself.
Packit Service ac2942
   */
Packit Service ac2942
  AtomicInt64& operator =(const int64_t& right) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    set(right);
Packit Service ac2942
    return *this;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Cast operator to integer.
Packit Service ac2942
   * @return the current value.
Packit Service ac2942
   */
Packit Service ac2942
  operator int64_t() const {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    return get();
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Summation assignment operator by integer.
Packit Service ac2942
   * @param right the right operand.
Packit Service ac2942
   * @return the reference to itself.
Packit Service ac2942
   */
Packit Service ac2942
  AtomicInt64& operator +=(int64_t right) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    add(right);
Packit Service ac2942
    return *this;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Subtraction assignment operator by integer.
Packit Service ac2942
   * @param right the right operand.
Packit Service ac2942
   * @return the reference to itself.
Packit Service ac2942
   */
Packit Service ac2942
  AtomicInt64& operator -=(int64_t right) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    add(-right);
Packit Service ac2942
    return *this;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Secure the least value
Packit Service ac2942
   * @param val the least value
Packit Service ac2942
   * @return the current value.
Packit Service ac2942
   */
Packit Service ac2942
  int64_t secure_least(int64_t val) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    while (true) {
Packit Service ac2942
      int64_t cur = get();
Packit Service ac2942
      if (cur >= val) return cur;
Packit Service ac2942
      if (cas(cur, val)) break;
Packit Service ac2942
    }
Packit Service ac2942
    return val;
Packit Service ac2942
  }
Packit Service ac2942
 private:
Packit Service ac2942
  /** The value. */
Packit Service ac2942
  volatile int64_t value_;
Packit Service ac2942
  /** The alternative lock. */
Packit Service ac2942
  mutable SpinLock lock_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
/**
Packit Service ac2942
 * Task queue device.
Packit Service ac2942
 */
Packit Service ac2942
class TaskQueue {
Packit Service ac2942
 public:
Packit Service ac2942
  class Task;
Packit Service ac2942
 private:
Packit Service ac2942
  class WorkerThread;
Packit Service ac2942
  /** An alias of list of tasks. */
Packit Service ac2942
  typedef std::list<Task*> TaskList;
Packit Service ac2942
 public:
Packit Service ac2942
  /**
Packit Service ac2942
   * Interface of a task.
Packit Service ac2942
   */
Packit Service ac2942
  class Task {
Packit Service ac2942
    friend class TaskQueue;
Packit Service ac2942
   public:
Packit Service ac2942
    /**
Packit Service ac2942
     * Default constructor.
Packit Service ac2942
     */
Packit Service ac2942
    explicit Task() : id_(0), thid_(0), aborted_(false) {
Packit Service ac2942
      _assert_(true);
Packit Service ac2942
    }
Packit Service ac2942
    /**
Packit Service ac2942
     * Destructor.
Packit Service ac2942
     */
Packit Service ac2942
    virtual ~Task() {
Packit Service ac2942
      _assert_(true);
Packit Service ac2942
    }
Packit Service ac2942
    /**
Packit Service ac2942
     * Get the ID number of the task.
Packit Service ac2942
     * @return the ID number of the task, which is incremented from 1.
Packit Service ac2942
     */
Packit Service ac2942
    uint64_t id() const {
Packit Service ac2942
      _assert_(true);
Packit Service ac2942
      return id_;
Packit Service ac2942
    }
Packit Service ac2942
    /**
Packit Service ac2942
     * Get the ID number of the worker thread.
Packit Service ac2942
     * @return the ID number of the worker thread.  It is from 0 to less than the number of
Packit Service ac2942
     * worker threads.
Packit Service ac2942
     */
Packit Service ac2942
    uint32_t thread_id() const {
Packit Service ac2942
      _assert_(true);
Packit Service ac2942
      return thid_;
Packit Service ac2942
    }
Packit Service ac2942
    /**
Packit Service ac2942
     * Check whether the thread is to be aborted.
Packit Service ac2942
     * @return true if the thread is to be aborted, or false if not.
Packit Service ac2942
     */
Packit Service ac2942
    bool aborted() const {
Packit Service ac2942
      _assert_(true);
Packit Service ac2942
      return aborted_;
Packit Service ac2942
    }
Packit Service ac2942
   private:
Packit Service ac2942
    /** The task ID number. */
Packit Service ac2942
    uint64_t id_;
Packit Service ac2942
    /** The thread ID number. */
Packit Service ac2942
    uint64_t thid_;
Packit Service ac2942
    /** The flag to be aborted. */
Packit Service ac2942
    bool aborted_;
Packit Service ac2942
  };
Packit Service ac2942
  /**
Packit Service ac2942
   * Default Constructor.
Packit Service ac2942
   */
Packit Service ac2942
  TaskQueue() : thary_(NULL), thnum_(0), tasks_(), count_(0), mutex_(), cond_(), seed_(0) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Destructor.
Packit Service ac2942
   */
Packit Service ac2942
  virtual ~TaskQueue() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Process a task.
Packit Service ac2942
   * @param task a task object.
Packit Service ac2942
   */
Packit Service ac2942
  virtual void do_task(Task* task) = 0;
Packit Service ac2942
  /**
Packit Service ac2942
   * Process the starting event.
Packit Service ac2942
   * @param task a task object.
Packit Service ac2942
   * @note This is called for each thread on starting.
Packit Service ac2942
   */
Packit Service ac2942
  virtual void do_start(const Task* task) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Process the finishing event.
Packit Service ac2942
   * @param task a task object.
Packit Service ac2942
   * @note This is called for each thread on finishing.
Packit Service ac2942
   */
Packit Service ac2942
  virtual void do_finish(const Task* task) {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Start the task queue.
Packit Service ac2942
   * @param thnum the number of worker threads.
Packit Service ac2942
   */
Packit Service ac2942
  void start(size_t thnum) {
Packit Service ac2942
    _assert_(thnum > 0 && thnum <= MEMMAXSIZ);
Packit Service ac2942
    thary_ = new WorkerThread[thnum];
Packit Service ac2942
    for (size_t i = 0; i < thnum; i++) {
Packit Service ac2942
      thary_[i].id_ = i;
Packit Service ac2942
      thary_[i].queue_ = this;
Packit Service ac2942
      thary_[i].start();
Packit Service ac2942
    }
Packit Service ac2942
    thnum_ = thnum;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Finish the task queue.
Packit Service ac2942
   * @note This function blocks until all tasks in the queue are popped.
Packit Service ac2942
   */
Packit Service ac2942
  void finish() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    mutex_.lock();
Packit Service ac2942
    TaskList::iterator it = tasks_.begin();
Packit Service ac2942
    TaskList::iterator itend = tasks_.end();
Packit Service ac2942
    while (it != itend) {
Packit Service ac2942
      Task* task = *it;
Packit Service ac2942
      task->aborted_ = true;
Packit Service ac2942
      ++it;
Packit Service ac2942
    }
Packit Service ac2942
    cond_.broadcast();
Packit Service ac2942
    mutex_.unlock();
Packit Service ac2942
    Thread::yield();
Packit Service ac2942
    for (double wsec = 1.0 / CLOCKTICK; true; wsec *= 2) {
Packit Service ac2942
      mutex_.lock();
Packit Service ac2942
      if (tasks_.empty()) {
Packit Service ac2942
        mutex_.unlock();
Packit Service ac2942
        break;
Packit Service ac2942
      }
Packit Service ac2942
      mutex_.unlock();
Packit Service ac2942
      if (wsec > 1.0) wsec = 1.0;
Packit Service ac2942
      Thread::sleep(wsec);
Packit Service ac2942
    }
Packit Service ac2942
    mutex_.lock();
Packit Service ac2942
    for (size_t i = 0; i < thnum_; i++) {
Packit Service ac2942
      thary_[i].aborted_ = true;
Packit Service ac2942
    }
Packit Service ac2942
    cond_.broadcast();
Packit Service ac2942
    mutex_.unlock();
Packit Service ac2942
    for (size_t i = 0; i < thnum_; i++) {
Packit Service ac2942
      thary_[i].join();
Packit Service ac2942
    }
Packit Service ac2942
    delete[] thary_;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Add a task.
Packit Service ac2942
   * @param task a task object.
Packit Service ac2942
   * @return the number of tasks in the queue.
Packit Service ac2942
   */
Packit Service ac2942
  int64_t add_task(Task* task) {
Packit Service ac2942
    _assert_(task);
Packit Service ac2942
    mutex_.lock();
Packit Service ac2942
    task->id_ = ++seed_;
Packit Service ac2942
    tasks_.push_back(task);
Packit Service ac2942
    int64_t count = ++count_;
Packit Service ac2942
    cond_.signal();
Packit Service ac2942
    mutex_.unlock();
Packit Service ac2942
    return count;
Packit Service ac2942
  }
Packit Service ac2942
  /**
Packit Service ac2942
   * Get the number of tasks in the queue.
Packit Service ac2942
   * @return the number of tasks in the queue.
Packit Service ac2942
   */
Packit Service ac2942
  int64_t count() {
Packit Service ac2942
    _assert_(true);
Packit Service ac2942
    mutex_.lock();
Packit Service ac2942
    int64_t count = count_;
Packit Service ac2942
    mutex_.unlock();
Packit Service ac2942
    return count;
Packit Service ac2942
  }
Packit Service ac2942
 private:
Packit Service ac2942
  /**
Packit Service ac2942
   * Implementation of the worker thread.
Packit Service ac2942
   */
Packit Service ac2942
  class WorkerThread : public Thread {
Packit Service ac2942
    friend class TaskQueue;
Packit Service ac2942
   public:
Packit Service ac2942
    explicit WorkerThread() : id_(0), queue_(NULL), aborted_(false) {
Packit Service ac2942
      _assert_(true);
Packit Service ac2942
    }
Packit Service ac2942
   private:
Packit Service ac2942
    void run() {
Packit Service ac2942
      _assert_(true);
Packit Service ac2942
      Task* stask = new Task;
Packit Service ac2942
      stask->thid_ = id_;
Packit Service ac2942
      queue_->do_start(stask);
Packit Service ac2942
      delete stask;
Packit Service ac2942
      bool empty = false;
Packit Service ac2942
      while (true) {
Packit Service ac2942
        queue_->mutex_.lock();
Packit Service ac2942
        if (aborted_) {
Packit Service ac2942
          queue_->mutex_.unlock();
Packit Service ac2942
          break;
Packit Service ac2942
        }
Packit Service ac2942
        if (empty) queue_->cond_.wait(&queue_->mutex_, 1.0);
Packit Service ac2942
        Task * task = NULL;
Packit Service ac2942
        if (queue_->tasks_.empty()) {
Packit Service ac2942
          empty = true;
Packit Service ac2942
        } else {
Packit Service ac2942
          task = queue_->tasks_.front();
Packit Service ac2942
          task->thid_ = id_;
Packit Service ac2942
          queue_->tasks_.pop_front();
Packit Service ac2942
          queue_->count_--;
Packit Service ac2942
          empty = false;
Packit Service ac2942
        }
Packit Service ac2942
        queue_->mutex_.unlock();
Packit Service ac2942
        if (task) queue_->do_task(task);
Packit Service ac2942
      }
Packit Service ac2942
      Task* ftask = new Task;
Packit Service ac2942
      ftask->thid_ = id_;
Packit Service ac2942
      ftask->aborted_ = true;
Packit Service ac2942
      queue_->do_finish(ftask);
Packit Service ac2942
      delete ftask;
Packit Service ac2942
    }
Packit Service ac2942
    uint32_t id_;
Packit Service ac2942
    TaskQueue* queue_;
Packit Service ac2942
    Task* task_;
Packit Service ac2942
    bool aborted_;
Packit Service ac2942
  };
Packit Service ac2942
  /** Dummy constructor to forbid the use. */
Packit Service ac2942
  TaskQueue(const TaskQueue&);
Packit Service ac2942
  /** Dummy Operator to forbid the use. */
Packit Service ac2942
  TaskQueue& operator =(const TaskQueue&);
Packit Service ac2942
  /** The array of worker threads. */
Packit Service ac2942
  WorkerThread* thary_;
Packit Service ac2942
  /** The number of worker threads. */
Packit Service ac2942
  size_t thnum_;
Packit Service ac2942
  /** The list of tasks. */
Packit Service ac2942
  TaskList tasks_;
Packit Service ac2942
  /** The number of the tasks. */
Packit Service ac2942
  int64_t count_;
Packit Service ac2942
  /** The mutex for the task list. */
Packit Service ac2942
  Mutex mutex_;
Packit Service ac2942
  /** The condition variable for the task list. */
Packit Service ac2942
  CondVar cond_;
Packit Service ac2942
  /** The seed of ID numbers. */
Packit Service ac2942
  uint64_t seed_;
Packit Service ac2942
};
Packit Service ac2942
Packit Service ac2942
Packit Service ac2942
}                                        // common namespace
Packit Service ac2942
Packit Service ac2942
#endif                                   // duplication check
Packit Service ac2942
Packit Service ac2942
// END OF FILE