/* * Copyright 2016-2018, Intel Corporation * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * * Neither the name of the copyright holder nor the names of its * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /* * obj_cpp_cond_var.cpp -- cpp condition variable test */ #include "unittest.hpp" #include #include #include #include #include #include #include #define LAYOUT "cpp" namespace nvobj = pmem::obj; namespace { /* * Due to premature wake-ups with the TIMEDOUT status on the Windows platform, * an "epsilon" tolerance has been introduced where appropriate. */ /* convenience typedef */ typedef std::function)> reader_type; /* pool root structure */ struct root { nvobj::mutex pmutex; nvobj::condition_variable cond; unsigned counter; }; /* the number of threads */ const unsigned num_threads = 30; /* notification limit */ const unsigned limit = 7000; /* cond wait time in milliseconds */ const std::chrono::milliseconds wait_time(150); /* * Premature wake-up tolerance. * XXX Windows - this needs to be investigated, it shouldn't timeout this long * before the actual timeout. */ const auto epsilon = std::chrono::milliseconds(16); /* * write_notify -- (internal) bump up the counter up to a limit and notify */ void write_notify(nvobj::persistent_ptr proot, bool notify, bool all) { std::lock_guard lock(proot->pmutex); while (proot->counter < limit) proot->counter++; if (!notify) return; if (all) proot->cond.notify_all(); else proot->cond.notify_one(); } /* * reader_mutex -- (internal) verify the counter value */ void reader_mutex(nvobj::persistent_ptr proot) { proot->pmutex.lock(); while (proot->counter != limit) proot->cond.wait(proot->pmutex); UT_ASSERTeq(proot->counter, limit); proot->pmutex.unlock(); } /* * reader_mutex_pred -- (internal) verify the counter value */ void reader_mutex_pred(nvobj::persistent_ptr proot) { proot->pmutex.lock(); proot->cond.wait(proot->pmutex, [&]() { return proot->counter == limit; }); UT_ASSERTeq(proot->counter, limit); proot->pmutex.unlock(); } /* * reader_lock -- (internal) verify the counter value */ void reader_lock(nvobj::persistent_ptr proot) { std::unique_lock lock(proot->pmutex); while (proot->counter != limit) proot->cond.wait(lock); UT_ASSERTeq(proot->counter, limit); lock.unlock(); } /* * reader_lock_pred -- (internal) verify the counter value */ void reader_lock_pred(nvobj::persistent_ptr proot) { std::unique_lock lock(proot->pmutex); proot->cond.wait(lock, [&]() { return proot->counter == limit; }); UT_ASSERTeq(proot->counter, limit); lock.unlock(); } /* * reader_mutex_until -- (internal) verify the counter value or timeout */ void reader_mutex_until(nvobj::persistent_ptr proot) { proot->pmutex.lock(); auto until = std::chrono::system_clock::now(); until += wait_time; auto ret = proot->cond.wait_until(proot->pmutex, until); auto now = std::chrono::system_clock::now(); if (ret == std::cv_status::timeout) { auto diff = std::chrono::duration_cast( until - now); if (now < until) UT_ASSERT(diff < epsilon); } else { UT_ASSERTeq(proot->counter, limit); } proot->pmutex.unlock(); } /* * reader_mutex_until_pred -- (internal) verify the counter value or timeout */ void reader_mutex_until_pred(nvobj::persistent_ptr proot) { proot->pmutex.lock(); auto until = std::chrono::system_clock::now(); until += wait_time; auto ret = proot->cond.wait_until(proot->pmutex, until, [&]() { return proot->counter == limit; }); auto now = std::chrono::system_clock::now(); if (ret == false) { auto diff = std::chrono::duration_cast( until - now); if (now < until) UT_ASSERT(diff < epsilon); } else { UT_ASSERTeq(proot->counter, limit); } proot->pmutex.unlock(); } /* * reader_lock_until -- (internal) verify the counter value or timeout */ void reader_lock_until(nvobj::persistent_ptr proot) { std::unique_lock lock(proot->pmutex); auto until = std::chrono::system_clock::now(); until += wait_time; auto ret = proot->cond.wait_until(lock, until); auto now = std::chrono::system_clock::now(); if (ret == std::cv_status::timeout) { auto diff = std::chrono::duration_cast( until - now); if (now < until) UT_ASSERT(diff < epsilon); } else { UT_ASSERTeq(proot->counter, limit); } lock.unlock(); } /* * reader_lock_until_pred -- (internal) verify the counter value or timeout */ void reader_lock_until_pred(nvobj::persistent_ptr proot) { std::unique_lock lock(proot->pmutex); auto until = std::chrono::system_clock::now(); until += wait_time; auto ret = proot->cond.wait_until( lock, until, [&]() { return proot->counter == limit; }); auto now = std::chrono::system_clock::now(); if (ret == false) { auto diff = std::chrono::duration_cast( until - now); if (now < until) UT_ASSERT(diff < epsilon); } else { UT_ASSERTeq(proot->counter, limit); } lock.unlock(); } /* * reader_mutex_for -- (internal) verify the counter value or timeout */ void reader_mutex_for(nvobj::persistent_ptr proot) { proot->pmutex.lock(); auto until = std::chrono::system_clock::now(); until += wait_time; auto ret = proot->cond.wait_for(proot->pmutex, wait_time); auto now = std::chrono::system_clock::now(); if (ret == std::cv_status::timeout) { auto diff = std::chrono::duration_cast( until - now); if (now < until) UT_ASSERT(diff < epsilon); } else { UT_ASSERTeq(proot->counter, limit); } proot->pmutex.unlock(); } /* * reader_mutex_for_pred -- (internal) verify the counter value or timeout */ void reader_mutex_for_pred(nvobj::persistent_ptr proot) { proot->pmutex.lock(); auto until = std::chrono::system_clock::now(); until += wait_time; auto ret = proot->cond.wait_for(proot->pmutex, wait_time, [&]() { return proot->counter == limit; }); auto now = std::chrono::system_clock::now(); if (ret == false) { auto diff = std::chrono::duration_cast( until - now); if (now < until) UT_ASSERT(diff < epsilon); } else { UT_ASSERTeq(proot->counter, limit); } proot->pmutex.unlock(); } /* * reader_lock_for -- (internal) verify the counter value or timeout */ void reader_lock_for(nvobj::persistent_ptr proot) { std::unique_lock lock(proot->pmutex); auto until = std::chrono::system_clock::now(); until += wait_time; auto ret = proot->cond.wait_for(lock, wait_time); auto now = std::chrono::system_clock::now(); if (ret == std::cv_status::timeout) { auto diff = std::chrono::duration_cast( until - now); if (now < until) UT_ASSERT(diff < epsilon); } else { UT_ASSERTeq(proot->counter, limit); } lock.unlock(); } /* * reader_lock_for_pred -- (internal) verify the counter value or timeout */ void reader_lock_for_pred(nvobj::persistent_ptr proot) { std::unique_lock lock(proot->pmutex); auto until = std::chrono::system_clock::now(); until += wait_time; auto ret = proot->cond.wait_for( lock, wait_time, [&]() { return proot->counter == limit; }); auto now = std::chrono::system_clock::now(); if (ret == false) { auto diff = std::chrono::duration_cast( until - now); if (now < until) UT_ASSERT(diff < epsilon); } else { UT_ASSERTeq(proot->counter, limit); } lock.unlock(); } /* * cond_zero_test -- (internal) test the zeroing constructor */ void cond_zero_test(nvobj::pool &pop) { PMEMoid raw_cnd; pmemobj_alloc(pop.handle(), &raw_cnd, sizeof(PMEMcond), 1, [](PMEMobjpool *pop, void *ptr, void *) -> int { PMEMcond *mtx = static_cast(ptr); pmemobj_memset_persist(pop, mtx, 1, sizeof(*mtx)); return 0; }, nullptr); nvobj::condition_variable *placed_mtx = new (pmemobj_direct(raw_cnd)) nvobj::condition_variable; std::unique_lock lock(pop.root()->pmutex); placed_mtx->wait_for(lock, wait_time, []() { return false; }); } /* * mutex_test -- (internal) launch worker threads to test the pshared_mutex */ template void mutex_test(nvobj::pool &pop, bool notify, bool notify_all, const Reader &writer, const Writer &reader) { const auto total_threads = num_threads * 2u; std::thread threads[total_threads]; nvobj::persistent_ptr proot = pop.root(); for (unsigned i = 0; i < total_threads; i += 2) { threads[i] = std::thread(reader, proot); threads[i + 1] = std::thread(writer, proot, notify, notify_all); } for (unsigned i = 0; i < total_threads; ++i) threads[i].join(); } } int main(int argc, char *argv[]) { START(); if (argc != 2) UT_FATAL("usage: %s file-name", argv[0]); const char *path = argv[1]; nvobj::pool pop; try { pop = nvobj::pool::create( path, LAYOUT, PMEMOBJ_MIN_POOL, S_IWUSR | S_IRUSR); } catch (pmem::pool_error &pe) { UT_FATAL("!pool::create: %s %s", pe.what(), path); } cond_zero_test(pop); std::vector notify_functions( {reader_mutex, reader_mutex_pred, reader_lock, reader_lock_pred, reader_mutex_until, reader_mutex_until_pred, reader_lock_until, reader_lock_until_pred, reader_mutex_for, reader_mutex_for_pred, reader_lock_for, reader_lock_for_pred}); for (auto func : notify_functions) { unsigned reset_value = 42; mutex_test(pop, true, false, write_notify, func); pop.root()->counter = reset_value; mutex_test(pop, true, true, write_notify, func); pop.root()->counter = reset_value; } std::vector not_notify_functions( {reader_mutex_until, reader_mutex_until_pred, reader_lock_until, reader_lock_until_pred, reader_mutex_for, reader_mutex_for_pred, reader_lock_for, reader_lock_for_pred}); for (auto func : not_notify_functions) { unsigned reset_value = 42; mutex_test(pop, false, false, write_notify, func); pop.root()->counter = reset_value; mutex_test(pop, false, true, write_notify, func); pop.root()->counter = reset_value; } /* pmemcheck related persist */ pmemobj_persist(pop.handle(), &(pop.root()->counter), sizeof(pop.root()->counter)); pop.close(); return 0; }