/*
* Copyright 2016-2018, Intel Corporation
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* obj_cpp_cond_var.cpp -- cpp condition variable test
*/
#include "unittest.hpp"
#include <libpmemobj++/condition_variable.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/pool.hpp>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
#define LAYOUT "cpp"
namespace nvobj = pmem::obj;
namespace
{
/*
* Due to premature wake-ups with the TIMEDOUT status on the Windows platform,
* an "epsilon" tolerance has been introduced where appropriate.
*/
/* convenience typedef */
typedef std::function<void(nvobj::persistent_ptr<struct root>)> reader_type;
/* pool root structure */
struct root {
nvobj::mutex pmutex;
nvobj::condition_variable cond;
unsigned counter;
};
/* the number of threads */
const unsigned num_threads = 30;
/* notification limit */
const unsigned limit = 7000;
/* cond wait time in milliseconds */
const std::chrono::milliseconds wait_time(150);
/*
* Premature wake-up tolerance.
* XXX Windows - this needs to be investigated, it shouldn't timeout this long
* before the actual timeout.
*/
const auto epsilon = std::chrono::milliseconds(16);
/*
* write_notify -- (internal) bump up the counter up to a limit and notify
*/
void
write_notify(nvobj::persistent_ptr<struct root> proot, bool notify, bool all)
{
std::lock_guard<nvobj::mutex> lock(proot->pmutex);
while (proot->counter < limit)
proot->counter++;
if (!notify)
return;
if (all)
proot->cond.notify_all();
else
proot->cond.notify_one();
}
/*
* reader_mutex -- (internal) verify the counter value
*/
void
reader_mutex(nvobj::persistent_ptr<struct root> proot)
{
proot->pmutex.lock();
while (proot->counter != limit)
proot->cond.wait(proot->pmutex);
UT_ASSERTeq(proot->counter, limit);
proot->pmutex.unlock();
}
/*
* reader_mutex_pred -- (internal) verify the counter value
*/
void
reader_mutex_pred(nvobj::persistent_ptr<struct root> proot)
{
proot->pmutex.lock();
proot->cond.wait(proot->pmutex,
[&]() { return proot->counter == limit; });
UT_ASSERTeq(proot->counter, limit);
proot->pmutex.unlock();
}
/*
* reader_lock -- (internal) verify the counter value
*/
void
reader_lock(nvobj::persistent_ptr<struct root> proot)
{
std::unique_lock<nvobj::mutex> lock(proot->pmutex);
while (proot->counter != limit)
proot->cond.wait(lock);
UT_ASSERTeq(proot->counter, limit);
lock.unlock();
}
/*
* reader_lock_pred -- (internal) verify the counter value
*/
void
reader_lock_pred(nvobj::persistent_ptr<struct root> proot)
{
std::unique_lock<nvobj::mutex> lock(proot->pmutex);
proot->cond.wait(lock, [&]() { return proot->counter == limit; });
UT_ASSERTeq(proot->counter, limit);
lock.unlock();
}
/*
* reader_mutex_until -- (internal) verify the counter value or timeout
*/
void
reader_mutex_until(nvobj::persistent_ptr<struct root> proot)
{
proot->pmutex.lock();
auto until = std::chrono::system_clock::now();
until += wait_time;
auto ret = proot->cond.wait_until(proot->pmutex, until);
auto now = std::chrono::system_clock::now();
if (ret == std::cv_status::timeout) {
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(
until - now);
if (now < until)
UT_ASSERT(diff < epsilon);
} else {
UT_ASSERTeq(proot->counter, limit);
}
proot->pmutex.unlock();
}
/*
* reader_mutex_until_pred -- (internal) verify the counter value or timeout
*/
void
reader_mutex_until_pred(nvobj::persistent_ptr<struct root> proot)
{
proot->pmutex.lock();
auto until = std::chrono::system_clock::now();
until += wait_time;
auto ret = proot->cond.wait_until(proot->pmutex, until, [&]() {
return proot->counter == limit;
});
auto now = std::chrono::system_clock::now();
if (ret == false) {
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(
until - now);
if (now < until)
UT_ASSERT(diff < epsilon);
} else {
UT_ASSERTeq(proot->counter, limit);
}
proot->pmutex.unlock();
}
/*
* reader_lock_until -- (internal) verify the counter value or timeout
*/
void
reader_lock_until(nvobj::persistent_ptr<struct root> proot)
{
std::unique_lock<nvobj::mutex> lock(proot->pmutex);
auto until = std::chrono::system_clock::now();
until += wait_time;
auto ret = proot->cond.wait_until(lock, until);
auto now = std::chrono::system_clock::now();
if (ret == std::cv_status::timeout) {
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(
until - now);
if (now < until)
UT_ASSERT(diff < epsilon);
} else {
UT_ASSERTeq(proot->counter, limit);
}
lock.unlock();
}
/*
* reader_lock_until_pred -- (internal) verify the counter value or timeout
*/
void
reader_lock_until_pred(nvobj::persistent_ptr<struct root> proot)
{
std::unique_lock<nvobj::mutex> lock(proot->pmutex);
auto until = std::chrono::system_clock::now();
until += wait_time;
auto ret = proot->cond.wait_until(
lock, until, [&]() { return proot->counter == limit; });
auto now = std::chrono::system_clock::now();
if (ret == false) {
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(
until - now);
if (now < until)
UT_ASSERT(diff < epsilon);
} else {
UT_ASSERTeq(proot->counter, limit);
}
lock.unlock();
}
/*
* reader_mutex_for -- (internal) verify the counter value or timeout
*/
void
reader_mutex_for(nvobj::persistent_ptr<struct root> proot)
{
proot->pmutex.lock();
auto until = std::chrono::system_clock::now();
until += wait_time;
auto ret = proot->cond.wait_for(proot->pmutex, wait_time);
auto now = std::chrono::system_clock::now();
if (ret == std::cv_status::timeout) {
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(
until - now);
if (now < until)
UT_ASSERT(diff < epsilon);
} else {
UT_ASSERTeq(proot->counter, limit);
}
proot->pmutex.unlock();
}
/*
* reader_mutex_for_pred -- (internal) verify the counter value or timeout
*/
void
reader_mutex_for_pred(nvobj::persistent_ptr<struct root> proot)
{
proot->pmutex.lock();
auto until = std::chrono::system_clock::now();
until += wait_time;
auto ret = proot->cond.wait_for(proot->pmutex, wait_time, [&]() {
return proot->counter == limit;
});
auto now = std::chrono::system_clock::now();
if (ret == false) {
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(
until - now);
if (now < until)
UT_ASSERT(diff < epsilon);
} else {
UT_ASSERTeq(proot->counter, limit);
}
proot->pmutex.unlock();
}
/*
* reader_lock_for -- (internal) verify the counter value or timeout
*/
void
reader_lock_for(nvobj::persistent_ptr<struct root> proot)
{
std::unique_lock<nvobj::mutex> lock(proot->pmutex);
auto until = std::chrono::system_clock::now();
until += wait_time;
auto ret = proot->cond.wait_for(lock, wait_time);
auto now = std::chrono::system_clock::now();
if (ret == std::cv_status::timeout) {
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(
until - now);
if (now < until)
UT_ASSERT(diff < epsilon);
} else {
UT_ASSERTeq(proot->counter, limit);
}
lock.unlock();
}
/*
* reader_lock_for_pred -- (internal) verify the counter value or timeout
*/
void
reader_lock_for_pred(nvobj::persistent_ptr<struct root> proot)
{
std::unique_lock<nvobj::mutex> lock(proot->pmutex);
auto until = std::chrono::system_clock::now();
until += wait_time;
auto ret = proot->cond.wait_for(
lock, wait_time, [&]() { return proot->counter == limit; });
auto now = std::chrono::system_clock::now();
if (ret == false) {
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(
until - now);
if (now < until)
UT_ASSERT(diff < epsilon);
} else {
UT_ASSERTeq(proot->counter, limit);
}
lock.unlock();
}
/*
* cond_zero_test -- (internal) test the zeroing constructor
*/
void
cond_zero_test(nvobj::pool<struct root> &pop)
{
PMEMoid raw_cnd;
pmemobj_alloc(pop.handle(), &raw_cnd, sizeof(PMEMcond), 1,
[](PMEMobjpool *pop, void *ptr, void *) -> int {
PMEMcond *mtx = static_cast<PMEMcond *>(ptr);
pmemobj_memset_persist(pop, mtx, 1, sizeof(*mtx));
return 0;
},
nullptr);
nvobj::condition_variable *placed_mtx =
new (pmemobj_direct(raw_cnd)) nvobj::condition_variable;
std::unique_lock<nvobj::mutex> lock(pop.root()->pmutex);
placed_mtx->wait_for(lock, wait_time, []() { return false; });
}
/*
* mutex_test -- (internal) launch worker threads to test the pshared_mutex
*/
template <typename Reader, typename Writer>
void
mutex_test(nvobj::pool<struct root> &pop, bool notify, bool notify_all,
const Reader &writer, const Writer &reader)
{
const auto total_threads = num_threads * 2u;
std::thread threads[total_threads];
nvobj::persistent_ptr<struct root> proot = pop.root();
for (unsigned i = 0; i < total_threads; i += 2) {
threads[i] = std::thread(reader, proot);
threads[i + 1] = std::thread(writer, proot, notify, notify_all);
}
for (unsigned i = 0; i < total_threads; ++i)
threads[i].join();
}
}
int
main(int argc, char *argv[])
{
START();
if (argc != 2)
UT_FATAL("usage: %s file-name", argv[0]);
const char *path = argv[1];
nvobj::pool<struct root> pop;
try {
pop = nvobj::pool<struct root>::create(
path, LAYOUT, PMEMOBJ_MIN_POOL, S_IWUSR | S_IRUSR);
} catch (pmem::pool_error &pe) {
UT_FATAL("!pool::create: %s %s", pe.what(), path);
}
cond_zero_test(pop);
std::vector<reader_type> notify_functions(
{reader_mutex, reader_mutex_pred, reader_lock, reader_lock_pred,
reader_mutex_until, reader_mutex_until_pred, reader_lock_until,
reader_lock_until_pred, reader_mutex_for,
reader_mutex_for_pred, reader_lock_for, reader_lock_for_pred});
for (auto func : notify_functions) {
unsigned reset_value = 42;
mutex_test(pop, true, false, write_notify, func);
pop.root()->counter = reset_value;
mutex_test(pop, true, true, write_notify, func);
pop.root()->counter = reset_value;
}
std::vector<reader_type> not_notify_functions(
{reader_mutex_until, reader_mutex_until_pred, reader_lock_until,
reader_lock_until_pred, reader_mutex_for,
reader_mutex_for_pred, reader_lock_for, reader_lock_for_pred});
for (auto func : not_notify_functions) {
unsigned reset_value = 42;
mutex_test(pop, false, false, write_notify, func);
pop.root()->counter = reset_value;
mutex_test(pop, false, true, write_notify, func);
pop.root()->counter = reset_value;
}
/* pmemcheck related persist */
pmemobj_persist(pop.handle(), &(pop.root()->counter),
sizeof(pop.root()->counter));
pop.close();
return 0;
}