/************************************************************************************************* * Database extension * Copyright (C) 2009-2012 FAL Labs * This file is part of Kyoto Cabinet. * This program is free software: you can redistribute it and/or modify it under the terms of * the GNU General Public License as published by the Free Software Foundation, either version * 3 of the License, or any later version. * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU General Public License for more details. * You should have received a copy of the GNU General Public License along with this program. * If not, see . *************************************************************************************************/ #ifndef _KCDBEXT_H // duplication check #define _KCDBEXT_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace kyotocabinet { // common namespace /** * MapReduce framework. * @note Although this framework is not distributed or concurrent, it is useful for aggregate * calculation with less CPU loading and less memory usage. */ class MapReduce { public: class ValueIterator; private: class FlushThread; class ReduceTaskQueue; class MapVisitor; struct MergeLine; /** An alias of vector of loaded values. */ typedef std::vector Values; /** The default number of temporary databases. */ static const size_t DEFDBNUM = 8; /** The maxinum number of temporary databases. */ static const size_t MAXDBNUM = 256; /** The default cache limit. */ static const int64_t DEFCLIM = 512LL << 20; /** The default cache bucket numer. */ static const int64_t DEFCBNUM = 1048583LL; /** The bucket number of temprary databases. */ static const int64_t DBBNUM = 512LL << 10; /** The page size of temprary databases. */ static const int32_t DBPSIZ = 32768; /** The mapped size of temprary databases. */ static const int64_t DBMSIZ = 516LL * 4096; /** The page cache capacity of temprary databases. */ static const int64_t DBPCCAP = 16LL << 20; /** The default number of threads in parallel mode. */ static const size_t DEFTHNUM = 8; /** The number of slots of the record lock. */ static const int32_t RLOCKSLOT = 256; public: /** * Value iterator for the reducer. */ class ValueIterator { friend class MapReduce; public: /** * Get the next value. * @param sp the pointer to the variable into which the size of the region of the return * value is assigned. * @return the pointer to the next value region, or NULL if no value remains. */ const char* next(size_t* sp) { _assert_(sp); if (!vptr_) { if (vit_ == vend_) return NULL; vptr_ = vit_->data(); vsiz_ = vit_->size(); vit_++; } uint64_t vsiz; size_t step = readvarnum(vptr_, vsiz_, &vsiz); vptr_ += step; vsiz_ -= step; const char* vbuf = vptr_; *sp = vsiz; vptr_ += vsiz; vsiz_ -= vsiz; if (vsiz_ < 1) vptr_ = NULL; return vbuf; } private: /** * Default constructor. */ explicit ValueIterator(Values::const_iterator vit, Values::const_iterator vend) : vit_(vit), vend_(vend), vptr_(NULL), vsiz_(0) { _assert_(true); } /** * Destructor. */ ~ValueIterator() { _assert_(true); } /** Dummy constructor to forbid the use. */ ValueIterator(const ValueIterator&); /** Dummy Operator to forbid the use. */ ValueIterator& operator =(const ValueIterator&); /** The current iterator of loaded values. */ Values::const_iterator vit_; /** The ending iterator of loaded values. */ Values::const_iterator vend_; /** The pointer of the current value. */ const char* vptr_; /** The size of the current value. */ size_t vsiz_; }; /** * Execution options. */ enum Option { XNOLOCK = 1 << 0, ///< avoid locking against update operations XPARAMAP = 1 << 1, ///< run mappers in parallel XPARARED = 1 << 2, ///< run reducers in parallel XPARAFLS = 1 << 3, ///< run cache flushers in parallel XNOCOMP = 1 << 8 ///< avoid compression of temporary databases }; /** * Default constructor. */ explicit MapReduce() : db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0), mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), flsthnum_(DEFTHNUM), cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), flsths_(NULL), redtasks_(NULL), redaborted_(false), rlocks_(NULL) { _assert_(true); } /** * Destructor. */ virtual ~MapReduce() { _assert_(true); } /** * Map a record data. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param vbuf the pointer to the value region. * @param vsiz the size of the value region. * @return true on success, or false on failure. * @note This function can call the MapReduce::emit method to emit a record. To avoid * deadlock, any explicit database operation must not be performed in this function. */ virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) = 0; /** * Reduce a record data. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param iter the iterator to get the values. * @return true on success, or false on failure. * @note To avoid deadlock, any explicit database operation must not be performed in this * function. */ virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0; /** * Preprocess the map operations. * @return true on success, or false on failure. * @note This function can call the MapReduce::emit method to emit a record. To avoid * deadlock, any explicit database operation must not be performed in this function. */ virtual bool preprocess() { _assert_(true); return true; } /** * Mediate between the map and the reduce phases. * @return true on success, or false on failure. * @note This function can call the MapReduce::emit method to emit a record. To avoid * deadlock, any explicit database operation must not be performed in this function. */ virtual bool midprocess() { _assert_(true); return true; } /** * Postprocess the reduce operations. * @return true on success, or false on failure. * @note To avoid deadlock, any explicit database operation must not be performed in this * function. */ virtual bool postprocess() { _assert_(true); return true; } /** * Process a log message. * @param name the name of the event. * @param message a supplement message. * @return true on success, or false on failure. */ virtual bool log(const char* name, const char* message) { _assert_(name && message); return true; } /** * Execute the MapReduce process about a database. * @param db the source database. * @param tmppath the path of a directory for the temporary data storage. If it is an empty * string, temporary data are handled on memory. * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking * against update operations by other threads, MapReduce::XPARAMAP to run the mapper in * parallel, MapReduce::XPARARED to run the reducer in parallel, MapReduce::XNOCOMP to avoid * compression of temporary databases. * @return true on success, or false on failure. */ bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) { int64_t count = db->count(); if (count < 0) { if (db->error() != BasicDB::Error::NOIMPL) return false; count = 0; } bool err = false; double stime, etime; db_ = db; rcomp_ = LEXICALCOMP; BasicDB* idb = db; if (typeid(*db) == typeid(PolyDB)) { PolyDB* pdb = (PolyDB*)idb; idb = pdb->reveal_inner_db(); } const std::type_info& info = typeid(*idb); if (info == typeid(GrassDB)) { GrassDB* gdb = (GrassDB*)idb; rcomp_ = gdb->rcomp(); } else if (info == typeid(TreeDB)) { TreeDB* tdb = (TreeDB*)idb; rcomp_ = tdb->rcomp(); } else if (info == typeid(ForestDB)) { ForestDB* fdb = (ForestDB*)idb; rcomp_ = fdb->rcomp(); } tmpdbs_ = new BasicDB*[dbnum_]; if (tmppath.empty()) { if (!logf("prepare", "started to open temporary databases on memory")) err = true; stime = time(); for (size_t i = 0; i < dbnum_; i++) { GrassDB* gdb = new GrassDB; int32_t myopts = 0; if (!(opts & XNOCOMP)) myopts |= GrassDB::TCOMPRESS; gdb->tune_options(myopts); gdb->tune_buckets(DBBNUM / 2); gdb->tune_page(DBPSIZ); gdb->tune_page_cache(DBPCCAP); gdb->tune_comparator(rcomp_); gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUNCATE); tmpdbs_[i] = gdb; } etime = time(); if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime)) err = true; if (err) { delete[] tmpdbs_; return false; } } else { File::Status sbuf; if (!File::status(tmppath, &sbuf) || !sbuf.isdir) { db->set_error(_KCCODELINE_, BasicDB::Error::NOREPOS, "no such directory"); delete[] tmpdbs_; return false; } if (!logf("prepare", "started to open temporary databases under %s", tmppath.c_str())) err = true; stime = time(); uint32_t pid = getpid() & UINT16MAX; uint32_t tid = Thread::hash() & UINT16MAX; uint32_t ts = time() * 1000; for (size_t i = 0; i < dbnum_; i++) { std::string childpath = strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct", tmppath.c_str(), File::PATHCHR, pid, tid, ts, (int)(i + 1), File::EXTCHR); TreeDB* tdb = new TreeDB; int32_t myopts = TreeDB::TSMALL | TreeDB::TLINEAR; if (!(opts & XNOCOMP)) myopts |= TreeDB::TCOMPRESS; tdb->tune_options(myopts); tdb->tune_buckets(DBBNUM); tdb->tune_page(DBPSIZ); tdb->tune_map(DBMSIZ); tdb->tune_page_cache(DBPCCAP); tdb->tune_comparator(rcomp_); if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeDB::OTRUNCATE)) { const BasicDB::Error& e = tdb->error(); db->set_error(_KCCODELINE_, e.code(), e.message()); err = true; } tmpdbs_[i] = tdb; } etime = time(); if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime)) err = true; if (err) { for (size_t i = 0; i < dbnum_; i++) { delete tmpdbs_[i]; } delete[] tmpdbs_; return false; } } if (opts & XPARARED) redtasks_ = new ReduceTaskQueue; if (opts & XPARAFLS) flsths_ = new std::deque; if (opts & XNOLOCK) { MapChecker mapchecker; MapVisitor mapvisitor(this, &mapchecker, count); mapvisitor.visit_before(); if (!err) { BasicDB::Cursor* cur = db->cursor(); if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = true; while (!err) { if (!cur->accept(&mapvisitor, false, true)) { if (cur->error() != BasicDB::Error::NOREC) err = true; break; } } delete cur; } if (mapvisitor.error()) { db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"); err = true; } mapvisitor.visit_after(); } else if (opts & XPARAMAP) { MapChecker mapchecker; MapVisitor mapvisitor(this, &mapchecker, count); rlocks_ = new SlottedMutex(RLOCKSLOT); if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) { db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"); err = true; } delete rlocks_; rlocks_ = NULL; if (mapvisitor.error()) err = true; } else { MapChecker mapchecker; MapVisitor mapvisitor(this, &mapchecker, count); if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true; if (mapvisitor.error()) { db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"); err = true; } } if (flsths_) { delete flsths_; flsths_ = NULL; } if (redtasks_) { delete redtasks_; redtasks_ = NULL; } if (!logf("clean", "closing the temporary databases")) err = true; stime = time(); for (size_t i = 0; i < dbnum_; i++) { const std::string& path = tmpdbs_[i]->path(); if (!tmpdbs_[i]->clear()) { const BasicDB::Error& e = tmpdbs_[i]->error(); db->set_error(_KCCODELINE_, e.code(), e.message()); err = true; } if (!tmpdbs_[i]->close()) { const BasicDB::Error& e = tmpdbs_[i]->error(); db->set_error(_KCCODELINE_, e.code(), e.message()); err = true; } if (!tmppath.empty()) File::remove(path); delete tmpdbs_[i]; } etime = time(); if (!logf("clean", "closing the temporary databases finished: time=%.6f", etime - stime)) err = true; delete[] tmpdbs_; return !err; } /** * Set the storage configurations. * @param dbnum the number of temporary databases. * @param clim the limit size of the internal cache. * @param cbnum the bucket number of the internal cache. */ void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { _assert_(true); dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; clim_ = clim > 0 ? clim : DEFCLIM; cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); } /** * Set the thread configurations. * @param mapthnum the number of threads for the mapper. * @param redthnum the number of threads for the reducer. * @param flsthnum the number of threads for the internal flusher. */ void tune_thread(int32_t mapthnum, int32_t redthnum, int32_t flsthnum) { _assert_(true); mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM; redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM; flsthnum_ = flsthnum > 0 ? flsthnum : DEFTHNUM; } protected: /** * Emit a record from the mapper. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param vbuf the pointer to the value region. * @param vsiz the size of the value region. * @return true on success, or false on failure. */ bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); bool err = false; size_t rsiz = sizevarnum(vsiz) + vsiz; char stack[NUMBUFSIZ*4]; char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; char* wp = rbuf; wp += writevarnum(rbuf, vsiz); std::memcpy(wp, vbuf, vsiz); if (rlocks_) { size_t bidx = TinyHashMap::hash_record(kbuf, ksiz) % cbnum_; size_t lidx = bidx % RLOCKSLOT; rlocks_->lock(lidx); cache_->append(kbuf, ksiz, rbuf, rsiz); rlocks_->unlock(lidx); } else { cache_->append(kbuf, ksiz, rbuf, rsiz); } if (rbuf != stack) delete[] rbuf; csiz_ += sizevarnum(ksiz) + ksiz + rsiz; return !err; } private: /** * Cache flusher. */ class FlushThread : public Thread { public: /** constructor */ explicit FlushThread(MapReduce* mr, BasicDB* tmpdb, TinyHashMap* cache, size_t csiz, bool cown) : mr_(mr), tmpdb_(tmpdb), cache_(cache), csiz_(csiz), cown_(cown), err_(false) {} /** perform the concrete process */ void run() { if (!mr_->logf("map", "started to flushing the cache: count=%lld size=%lld", (long long)cache_->count(), (long long)csiz_)) err_ = true; double stime = time(); BasicDB* tmpdb = tmpdb_; TinyHashMap* cache = cache_; bool cown = cown_; TinyHashMap::Sorter sorter(cache); const char* kbuf, *vbuf; size_t ksiz, vsiz; while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) { const BasicDB::Error& e = tmpdb->error(); mr_->db_->set_error(_KCCODELINE_, e.code(), e.message()); err_ = true; } sorter.step(); if (cown) cache->remove(kbuf, ksiz); } double etime = time(); if (!mr_->logf("map", "flushing the cache finished: time=%.6f", etime - stime)) err_ = true; if (cown) delete cache; } /** check the error flag. */ bool error() { return err_; } private: MapReduce* mr_; ///< driver BasicDB* tmpdb_; ///< temprary database TinyHashMap* cache_; ///< cache for emitter size_t csiz_; ///< current cache size bool cown_; ///< cache ownership flag bool err_; ///< error flag }; /** * Task queue for parallel reducer. */ class ReduceTaskQueue : public TaskQueue { public: /** * Task for parallel reducer. */ class ReduceTask : public Task { friend class ReduceTaskQueue; public: /** constructor */ explicit ReduceTask(MapReduce* mr, const char* kbuf, size_t ksiz, const Values& values) : mr_(mr), key_(kbuf, ksiz), values_(values) {} private: MapReduce* mr_; ///< driver std::string key_; ///< key Values values_; ///< values }; /** constructor */ explicit ReduceTaskQueue() {} private: /** process a task */ void do_task(Task* task) { ReduceTask* rtask = (ReduceTask*)task; ValueIterator iter(rtask->values_.begin(), rtask->values_.end()); if (!rtask->mr_->reduce(rtask->key_.data(), rtask->key_.size(), &iter)) rtask->mr_->redaborted_ = true; delete rtask; } }; /** * Checker for the map process. */ class MapChecker : public BasicDB::ProgressChecker { public: /** constructor */ explicit MapChecker() : stop_(false) {} /** stop the process */ void stop() { stop_ = true; } /** check whether stopped */ bool stopped() { return stop_; } private: /** check whether stopped */ bool check(const char* name, const char* message, int64_t curcnt, int64_t allcnt) { return !stop_; } bool stop_; ///< flag for stop }; /** * Visitor for the map process. */ class MapVisitor : public BasicDB::Visitor { public: /** constructor */ explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale) : mr_(mr), checker_(checker), scale_(scale), stime_(0), err_(false) {} /** get the error flag */ bool error() { return err_; } /** preprocess the mappter */ void visit_before() { mr_->dbclock_ = 0; mr_->cache_ = new TinyHashMap(mr_->cbnum_); mr_->csiz_ = 0; if (!mr_->preprocess()) err_ = true; if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; if (!mr_->logf("map", "started the map process: scale=%lld", (long long)scale_)) err_ = true; stime_ = time(); } /** postprocess the mappter and call the reducer */ void visit_after() { if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; double etime = time(); if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_)) err_ = true; if (!mr_->midprocess()) err_ = true; if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; delete mr_->cache_; if (mr_->flsths_ && !mr_->flsths_->empty()) { std::deque::iterator flthit = mr_->flsths_->begin(); std::deque::iterator flthitend = mr_->flsths_->end(); while (flthit != flthitend) { FlushThread* flth = *flthit; flth->join(); if (flth->error()) err_ = true; delete flth; ++flthit; } } if (!err_ && !mr_->execute_reduce()) err_ = true; if (!mr_->postprocess()) err_ = true; } private: /** visit a record */ const char* visit_full(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz, size_t* sp) { if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { checker_->stop(); err_ = true; } if (mr_->rlocks_) { if (mr_->csiz_ >= mr_->clim_) { mr_->rlocks_->lock_all(); if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { checker_->stop(); err_ = true; } mr_->rlocks_->unlock_all(); } } else { if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { checker_->stop(); err_ = true; } } return NOP; } MapReduce* mr_; ///< driver MapChecker* checker_; ///< checker int64_t scale_; ///< number of records double stime_; ///< start time bool err_; ///< error flag }; /** * Front line of a merging list. */ struct MergeLine { BasicDB::Cursor* cur; ///< cursor Comparator* rcomp; ///< record comparator char* kbuf; ///< pointer to the key size_t ksiz; ///< size of the key const char* vbuf; ///< pointer to the value size_t vsiz; ///< size of the value /** comparing operator */ bool operator <(const MergeLine& right) const { return rcomp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0; } }; /** * Process a log message. * @param name the name of the event. * @param format the printf-like format string. * @param ... used according to the format string. * @return true on success, or false on failure. */ bool logf(const char* name, const char* format, ...) { _assert_(name && format); va_list ap; va_start(ap, format); std::string message; vstrprintf(&message, format, ap); va_end(ap); return log(name, message.c_str()); } /** * Flush all cache records. * @return true on success, or false on failure. */ bool flush_cache() { _assert_(true); bool err = false; BasicDB* tmpdb = tmpdbs_[dbclock_]; dbclock_ = (dbclock_ + 1) % dbnum_; if (flsths_) { size_t num = flsths_->size(); if (num >= flsthnum_ || num >= dbnum_) { FlushThread* flth = flsths_->front(); flsths_->pop_front(); flth->join(); if (flth->error()) err = true; delete flth; } FlushThread* flth = new FlushThread(this, tmpdb, cache_, csiz_, true); cache_ = new TinyHashMap(cbnum_); csiz_ = 0; flth->start(); flsths_->push_back(flth); } else { FlushThread flth(this, tmpdb, cache_, csiz_, false); flth.run(); if (flth.error()) err = true; cache_->clear(); csiz_ = 0; } return !err; } /** * Execute the reduce part. * @return true on success, or false on failure. */ bool execute_reduce() { bool err = false; int64_t scale = 0; for (size_t i = 0; i < dbnum_; i++) { scale += tmpdbs_[i]->count(); } if (!logf("reduce", "started the reduce process: scale=%lld", (long long)scale)) err = true; double stime = time(); if (redtasks_) redtasks_->start(redthnum_); std::priority_queue lines; for (size_t i = 0; i < dbnum_; i++) { MergeLine line; line.cur = tmpdbs_[i]->cursor(); line.rcomp = rcomp_; line.cur->jump(); line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); if (line.kbuf) { lines.push(line); } else { delete line.cur; } } char* lkbuf = NULL; size_t lksiz = 0; Values values; while (!err && !lines.empty()) { MergeLine line = lines.top(); lines.pop(); if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lksiz))) { if (!call_reducer(lkbuf, lksiz, values)) { db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed"); err = true; } values.clear(); } delete[] lkbuf; lkbuf = line.kbuf; lksiz = line.ksiz; values.push_back(std::string(line.vbuf, line.vsiz)); line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); if (line.kbuf) { lines.push(line); } else { delete line.cur; } } if (lkbuf) { if (!err && !call_reducer(lkbuf, lksiz, values)) { db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed"); err = true; } delete[] lkbuf; } while (!lines.empty()) { MergeLine line = lines.top(); lines.pop(); delete[] line.kbuf; delete line.cur; } if (redtasks_) redtasks_->finish(); double etime = time(); if (!logf("reduce", "the reduce process finished: time=%.6f", etime - stime)) err = true; return !err; } /** * Call the reducer. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param values a vector of the values. * @return true on success, or false on failure. */ bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { _assert_(kbuf && ksiz <= MEMMAXSIZ); if (redtasks_) { if (redaborted_) return false; ReduceTaskQueue::ReduceTask* task = new ReduceTaskQueue::ReduceTask(this, kbuf, ksiz, values); redtasks_->add_task(task); return true; } bool err = false; ValueIterator iter(values.begin(), values.end()); if (!reduce(kbuf, ksiz, &iter)) err = true; return !err; } /** Dummy constructor to forbid the use. */ MapReduce(const MapReduce&); /** Dummy Operator to forbid the use. */ MapReduce& operator =(const MapReduce&); /** The internal database. */ BasicDB* db_; /** The record comparator. */ Comparator* rcomp_; /** The temporary databases. */ BasicDB** tmpdbs_; /** The number of temporary databases. */ size_t dbnum_; /** The logical clock for temporary databases. */ int64_t dbclock_; /** The number of the mapper threads. */ size_t mapthnum_; /** The number of the reducer threads. */ size_t redthnum_; /** The number of the flusher threads. */ size_t flsthnum_; /** The cache for emitter. */ TinyHashMap* cache_; /** The current size of the cache for emitter. */ AtomicInt64 csiz_; /** The limit size of the cache for emitter. */ int64_t clim_; /** The bucket number of the cache for emitter. */ int64_t cbnum_; /** The flush threads. */ std::deque* flsths_; /** The task queue for parallel reducer. */ TaskQueue* redtasks_; /** The flag whether aborted. */ bool redaborted_; /** The whole lock. */ SlottedMutex* rlocks_; }; /** * Index database. * @note This class is designed to implement an indexing storage with an efficient appending * operation for the existing record values. This class is a wrapper of the polymorphic * database, featuring buffering mechanism to alleviate IO overhead in the database layer. This * class can be inherited but overwriting methods is forbidden. Before every database operation, * it is necessary to call the IndexDB::open method in order to open a database file and connect * the database object to it. To avoid data missing or corruption, it is important to close * every database file by the IndexDB::close method when the database is no longer in use. It * is forbidden for multible database objects in a process to open the same database at the same * time. It is forbidden to share a database object with child processes. */ class IndexDB { private: /** The default number of temporary databases. */ static const size_t DEFDBNUM = 8; /** The maxinum number of temporary databases. */ static const size_t MAXDBNUM = 256; /** The default cache limit size. */ static const int64_t DEFCLIM = 256LL << 20; /** The default cache bucket number. */ static const int64_t DEFCBNUM = 1048583LL; /** The bucket number of temprary databases. */ static const int64_t DBBNUM = 512LL << 10; /** The page size of temprary databases. */ static const int32_t DBPSIZ = 32768; /** The mapped size of temprary databases. */ static const int64_t DBMSIZ = 516LL * 4096; /** The page cache capacity of temprary databases. */ static const int64_t DBPCCAP = 16LL << 20; public: /** * Default constructor. */ explicit IndexDB() : mlock_(), db_(), omode_(0), rcomp_(NULL), tmppath_(""), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0), cache_(NULL), csiz_(0), clim_(0) { _assert_(true); } /** * Destructor. * @note If the database is not closed, it is closed implicitly. */ virtual ~IndexDB() { _assert_(true); if (omode_ != 0) close(); } /** * Get the last happened error. * @return the last happened error. */ BasicDB::Error error() const { _assert_(true); return db_.error(); } /** * Set the error information. * @param file the file name of the program source code. * @param line the line number of the program source code. * @param func the function name of the program source code. * @param code an error code. * @param message a supplement message. */ void set_error(const char* file, int32_t line, const char* func, BasicDB::Error::Code code, const char* message) { _assert_(file && line > 0 && func && message); db_.set_error(file, line, func, code, message); } /** * Set the error information without source code information. * @param code an error code. * @param message a supplement message. */ void set_error(BasicDB::Error::Code code, const char* message) { _assert_(message); db_.set_error(_KCCODELINE_, code, message); } /** * Open a database file. * @param path the path of a database file. The same as with PolyDB. In addition, the * following tuning parameters are supported. "idxclim" specifies the limit size of the * internal cache. "idxcbnum" the bucket number of the internal cache. "idxdbnum" specifies * the number of internal databases. "idxtmppath' specifies the path of the temporary * directory. * @param mode the connection mode. The same as with PolyDB. * @return true on success, or false on failure. * @note Every opened database must be closed by the IndexDB::close method when it is no longer * in use. It is not allowed for two or more database objects in the same process to keep * their connections to the same database file at the same time. */ bool open(const std::string& path = ":", uint32_t mode = BasicDB::OWRITER | BasicDB::OCREATE) { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ != 0) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "already opened"); return false; } std::vector elems; strsplit(path, '#', &elems); int64_t clim = 0; int64_t cbnum = 0; size_t dbnum = 0; std::string tmppath = ""; std::vector::iterator it = elems.begin(); std::vector::iterator itend = elems.end(); if (it != itend) ++it; while (it != itend) { std::vector fields; if (strsplit(*it, '=', &fields) > 1) { const char* key = fields[0].c_str(); const char* value = fields[1].c_str(); if (!std::strcmp(key, "idxclim") || !std::strcmp(key, "idxcachelimit")) { clim = atoix(value); } else if (!std::strcmp(key, "idxcbnum") || !std::strcmp(key, "idxcachebuckets")) { cbnum = atoix(value); } else if (!std::strcmp(key, "idxdbnum")) { dbnum = atoix(value); } else if (!std::strcmp(key, "idxtmppath")) { tmppath = value; } } ++it; } if (!db_.open(path, mode)) return false; tmppath_ = tmppath; rcomp_ = LEXICALCOMP; BasicDB* idb = &db_; if (typeid(db_) == typeid(PolyDB)) { PolyDB* pdb = (PolyDB*)idb; idb = pdb->reveal_inner_db(); } const std::type_info& info = typeid(*idb); if (info == typeid(GrassDB)) { GrassDB* gdb = (GrassDB*)idb; rcomp_ = gdb->rcomp(); } else if (info == typeid(TreeDB)) { TreeDB* tdb = (TreeDB*)idb; rcomp_ = tdb->rcomp(); } else if (info == typeid(ForestDB)) { ForestDB* fdb = (ForestDB*)idb; rcomp_ = fdb->rcomp(); } dbnum_ = dbnum < MAXDBNUM ? dbnum : MAXDBNUM; dbclock_ = 0; if ((mode & BasicDB::OWRITER) && dbnum > 0) { tmpdbs_ = new BasicDB*[dbnum_]; if (tmppath_.empty()) { report(_KCCODELINE_, "started to open temporary databases on memory"); double stime = time(); for (size_t i = 0; i < dbnum_; i++) { GrassDB* gdb = new GrassDB; gdb->tune_options(GrassDB::TCOMPRESS); gdb->tune_buckets(DBBNUM / 2); gdb->tune_page(DBPSIZ); gdb->tune_page_cache(DBPCCAP); gdb->tune_comparator(rcomp_); gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUNCATE); tmpdbs_[i] = gdb; } double etime = time(); report(_KCCODELINE_, "opening temporary databases finished: time=%.6f", etime - stime); } else { File::Status sbuf; if (!File::status(tmppath_, &sbuf) || !sbuf.isdir) { set_error(_KCCODELINE_, BasicDB::Error::NOREPOS, "no such directory"); delete[] tmpdbs_; db_.close(); return false; } report(_KCCODELINE_, "started to open temporary databases under %s", tmppath.c_str()); double stime = time(); uint32_t pid = getpid() & UINT16MAX; uint32_t tid = Thread::hash() & UINT16MAX; uint32_t ts = time() * 1000; bool err = false; for (size_t i = 0; i < dbnum_; i++) { std::string childpath = strprintf("%s%cidx-%04x-%04x-%08x-%03d%ckct", tmppath_.c_str(), File::PATHCHR, pid, tid, ts, (int)(i + 1), File::EXTCHR); TreeDB* tdb = new TreeDB; tdb->tune_options(TreeDB::TSMALL | TreeDB::TLINEAR); tdb->tune_buckets(DBBNUM); tdb->tune_page(DBPSIZ); tdb->tune_map(DBMSIZ); tdb->tune_page_cache(DBPCCAP); tdb->tune_comparator(rcomp_); if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeDB::OTRUNCATE)) { const BasicDB::Error& e = tdb->error(); set_error(_KCCODELINE_, e.code(), e.message()); err = true; } tmpdbs_[i] = tdb; } double etime = time(); report(_KCCODELINE_, "opening temporary databases finished: time=%.6f", etime - stime); if (err) { for (size_t i = 0; i < dbnum_; i++) { delete tmpdbs_[i]; } delete[] tmpdbs_; db_.close(); return false; } } } else { tmpdbs_ = NULL; } if (mode & BasicDB::OWRITER) { cache_ = new TinyHashMap(cbnum > 0 ? cbnum : DEFCBNUM); } else { cache_ = NULL; } clim_ = clim > 0 ? clim : DEFCLIM; csiz_ = 0; omode_ = mode; return true; } /** * Close the database file. * @return true on success, or false on failure. */ bool close() { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); return false; } bool err = false; if (cache_) { if (!flush_cache()) err = true; delete cache_; if (tmpdbs_) { if (!merge_tmpdbs()) err = true; report(_KCCODELINE_, "closing the temporary databases"); double stime = time(); for (size_t i = 0; i < dbnum_; i++) { BasicDB* tmpdb = tmpdbs_[i]; const std::string& path = tmpdb->path(); if (!tmpdb->close()) { const BasicDB::Error& e = tmpdb->error(); set_error(_KCCODELINE_, e.code(), e.message()); err = true; } if (!tmppath_.empty()) File::remove(path); delete tmpdb; } double etime = time(); report(_KCCODELINE_, "closing the temporary databases finished: %.6f", etime - stime); delete[] tmpdbs_; } } if (!db_.close()) err = true; omode_ = 0; return !err; } /** * Set the value of a record. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param vbuf the pointer to the value region. * @param vsiz the size of the value region. * @return true on success, or false on failure. * @note If no record corresponds to the key, a new record is created. If the corresponding * record exists, the value is overwritten. */ bool set(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); return false; } if (!cache_) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); return false; } bool err = false; if (!clean_dbs(kbuf, ksiz)) err = true; cache_->set(kbuf, ksiz, vbuf, vsiz); csiz_ += ksiz + vsiz; if (csiz_ > clim_ && !flush_cache()) err = false; return !err; } /** * Set the value of a record. * @note Equal to the original DB::set method except that the parameters are std::string. */ bool set(const std::string& key, const std::string& value) { _assert_(true); return set(key.c_str(), key.size(), value.c_str(), value.size()); } /** * Add a record. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param vbuf the pointer to the value region. * @param vsiz the size of the value region. * @return true on success, or false on failure. * @note If no record corresponds to the key, a new record is created. If the corresponding * record exists, the record is not modified and false is returned. */ bool add(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); return false; } if (!cache_) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); return false; } if (check_impl(kbuf, ksiz)) { set_error(_KCCODELINE_, BasicDB::Error::DUPREC, "record duplication"); return false; } bool err = false; cache_->set(kbuf, ksiz, vbuf, vsiz); csiz_ += ksiz + vsiz; if (csiz_ > clim_ && !flush_cache()) err = false; return !err; } /** * Set the value of a record. * @note Equal to the original DB::add method except that the parameters are std::string. */ bool add(const std::string& key, const std::string& value) { _assert_(true); return add(key.c_str(), key.size(), value.c_str(), value.size()); } /** * Replace the value of a record. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param vbuf the pointer to the value region. * @param vsiz the size of the value region. * @return true on success, or false on failure. * @note If no record corresponds to the key, no new record is created and false is returned. * If the corresponding record exists, the value is modified. */ bool replace(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); return false; } if (!cache_) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); return false; } if (!check_impl(kbuf, ksiz)) { set_error(_KCCODELINE_, BasicDB::Error::NOREC, "no record"); return false; } bool err = false; if (!clean_dbs(kbuf, ksiz)) err = true; cache_->set(kbuf, ksiz, vbuf, vsiz); csiz_ += ksiz + vsiz; if (csiz_ > clim_ && !flush_cache()) err = false; return !err; } /** * Replace the value of a record. * @note Equal to the original DB::replace method except that the parameters are std::string. */ bool replace(const std::string& key, const std::string& value) { _assert_(true); return replace(key.c_str(), key.size(), value.c_str(), value.size()); } /** * Append the value of a record. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param vbuf the pointer to the value region. * @param vsiz the size of the value region. * @return true on success, or false on failure. * @note If no record corresponds to the key, a new record is created. If the corresponding * record exists, the given value is appended at the end of the existing value. */ bool append(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); return false; } if (!cache_) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); return false; } bool err = false; cache_->append(kbuf, ksiz, vbuf, vsiz); csiz_ += ksiz + vsiz; if (csiz_ > clim_ && !flush_cache()) err = false; return !err; } /** * Set the value of a record. * @note Equal to the original DB::append method except that the parameters are std::string. */ bool append(const std::string& key, const std::string& value) { _assert_(true); return append(key.c_str(), key.size(), value.c_str(), value.size()); } /** * Remove a record. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @return true on success, or false on failure. * @note If no record corresponds to the key, false is returned. */ bool remove(const char* kbuf, size_t ksiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); return false; } if (!cache_) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); return false; } bool err = false; if (!clean_dbs(kbuf, ksiz)) err = true; cache_->remove(kbuf, ksiz); return !err; } /** * Remove a record. * @note Equal to the original DB::remove method except that the parameter is std::string. */ bool remove(const std::string& key) { _assert_(true); return remove(key.c_str(), key.size()); } /** * Retrieve the value of a record. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param sp the pointer to the variable into which the size of the region of the return * value is assigned. * @return the pointer to the value region of the corresponding record, or NULL on failure. * @note If no record corresponds to the key, NULL is returned. Because an additional zero * code is appended at the end of the region of the return value, the return value can be * treated as a C-style string. Because the region of the return value is allocated with the * the new[] operator, it should be released with the delete[] operator when it is no longer * in use. */ char* get(const char* kbuf, size_t ksiz, size_t* sp) { _assert_(kbuf && ksiz <= MEMMAXSIZ && sp); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); *sp = 0; return nullptr; } if (!cache_) return db_.get(kbuf, ksiz, sp); size_t dvsiz = 0; char* dvbuf = db_.get(kbuf, ksiz, &dvsiz); size_t cvsiz = 0; const char* cvbuf = cache_->get(kbuf, ksiz, &cvsiz); struct Record { char* buf; size_t size; }; Record* recs = NULL; bool hit = false; size_t rsiz = 0; if (tmpdbs_) { recs = new Record[dbnum_]; for (size_t i = 0; i < dbnum_; i++) { BasicDB* tmpdb = tmpdbs_[i]; Record* rp = recs + i; rp->buf = tmpdb->get(kbuf, ksiz, &rp->size); if (rp->buf) { rsiz += rp->size; hit = true; } } } if (!hit) { delete[] recs; if (!dvbuf && !cvbuf) { *sp = 0; return NULL; } if (!dvbuf) { dvbuf = new char[cvsiz+1]; std::memcpy(dvbuf, cvbuf, cvsiz); *sp = cvsiz; return dvbuf; } if (!cvbuf) { *sp = dvsiz; return dvbuf; } char* rbuf = new char[dvsiz+cvsiz+1]; std::memcpy(rbuf, dvbuf, dvsiz); std::memcpy(rbuf + dvsiz, cvbuf, cvsiz); delete[] dvbuf; *sp = dvsiz + cvsiz; return rbuf; } if (dvbuf) rsiz += dvsiz; if (cvbuf) rsiz += cvsiz; char* rbuf = new char[rsiz+1]; char* wp = rbuf; if (dvbuf) { std::memcpy(wp, dvbuf, dvsiz); wp += dvsiz; delete[] dvbuf; } if (cvbuf) { std::memcpy(wp, cvbuf, cvsiz); wp += cvsiz; } if (recs) { for (size_t i = 0; i < dbnum_; i++) { Record* rp = recs + i; if (rp->buf) { std::memcpy(wp, rp->buf, rp->size); wp += rp->size; delete[] rp->buf; } } delete[] recs; } *sp = rsiz; return rbuf; } /** * Retrieve the value of a record. * @note Equal to the original DB::get method except that the first parameters is the key * string and the second parameter is a string to contain the result and the return value is * bool for success. */ bool get(const std::string& key, std::string* value) { _assert_(value); size_t vsiz; char* vbuf = get(key.c_str(), key.size(), &vsiz); if (!vbuf) return false; value->clear(); value->append(vbuf, vsiz); delete[] vbuf; return true; } /** * Synchronize updated contents with the file and the device. * @param hard true for physical synchronization with the device, or false for logical * synchronization with the file system. * @param proc a postprocessor object. If it is NULL, no postprocessing is performed. * @return true on success, or false on failure. * @note The operation of the postprocessor is performed atomically and other threads accessing * the same record are blocked. To avoid deadlock, any explicit database operation must not * be performed in this function. */ bool synchronize(bool hard = false, BasicDB::FileProcessor* proc = NULL) { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); return false; } if (!cache_) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); return false; } bool err = false; if (!flush_cache()) err = true; if (tmpdbs_ && !merge_tmpdbs()) err = true; if (!db_.synchronize(hard, proc)) err = true; return !err; } /** * Remove all records. * @return true on success, or false on failure. */ bool clear() { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); return false; } if (!cache_) { set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); return false; } cache_->clear(); csiz_ = 0; return db_.clear(); } /** * Get the number of records. * @return the number of records, or -1 on failure. */ int64_t count() { _assert_(true); ScopedRWLock lock(&mlock_, false); return count_impl(); } /** * Get the size of the database file. * @return the size of the database file in bytes, or -1 on failure. */ int64_t size() { _assert_(true); ScopedRWLock lock(&mlock_, false); return size_impl(); } /** * Get the path of the database file. * @return the path of the database file, or an empty string on failure. */ std::string path() { _assert_(true); return db_.path(); } /** * Get the miscellaneous status information. * @param strmap a string map to contain the result. * @return true on success, or false on failure. */ bool status(std::map* strmap) { _assert_(strmap); return db_.status(strmap); } /** * Reveal the inner database object. * @return the inner database object, or NULL on failure. */ PolyDB* reveal_inner_db() { _assert_(true); return &db_; } /** * Create a cursor object. * @return the return value is the created cursor object. * @note Because the object of the return value is allocated by the constructor, it should be * released with the delete operator when it is no longer in use. */ BasicDB::Cursor* cursor() { _assert_(true); return db_.cursor(); } /** * Write a log message. * @param file the file name of the program source code. * @param line the line number of the program source code. * @param func the function name of the program source code. * @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal * information, Logger::WARN for warning, and Logger::ERROR for fatal error. * @param message the supplement message. */ void log(const char* file, int32_t line, const char* func, BasicDB::Logger::Kind kind, const char* message) { _assert_(file && line > 0 && func && message); db_.log(file, line, func, kind, message); } /** * Set the internal logger. * @param logger the logger object. * @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging, * Logger::INFO for normal information, Logger::WARN for warning, and Logger::ERROR for fatal * error. * @return true on success, or false on failure. */ bool tune_logger(BasicDB::Logger* logger, uint32_t kinds = BasicDB::Logger::WARN | BasicDB::Logger::ERROR) { _assert_(logger); return db_.tune_logger(logger, kinds); } /** * Set the internal meta operation trigger. * @param trigger the trigger object. * @return true on success, or false on failure. */ bool tune_meta_trigger(BasicDB::MetaTrigger* trigger) { _assert_(trigger); return db_.tune_meta_trigger(trigger); } protected: /** * Report a message for debugging. * @param file the file name of the program source code. * @param line the line number of the program source code. * @param func the function name of the program source code. * @param format the printf-like format string. * @param ... used according to the format string. */ void report(const char* file, int32_t line, const char* func, const char* format, ...) { _assert_(file && line > 0 && func && format); std::string message; va_list ap; va_start(ap, format); vstrprintf(&message, format, ap); va_end(ap); db_.log(file, line, func, BasicDB::Logger::INFO, message.c_str()); } private: /** * Flush all cache records. * @return true on success, or false on failure. */ bool flush_cache() { _assert_(true); bool err = false; double stime = time(); report(_KCCODELINE_, "flushing the cache"); if (tmpdbs_) { BasicDB* tmpdb = tmpdbs_[dbclock_]; TinyHashMap::Sorter sorter(cache_); const char* kbuf, *vbuf; size_t ksiz, vsiz; while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) { const BasicDB::Error& e = tmpdb->error(); db_.set_error(_KCCODELINE_, e.code(), e.message()); err = true; } sorter.step(); } dbclock_ = (dbclock_ + 1) % dbnum_; } else { TinyHashMap::Sorter sorter(cache_); const char* kbuf, *vbuf; size_t ksiz, vsiz; while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { if (!db_.append(kbuf, ksiz, vbuf, vsiz)) err = true; sorter.step(); } } cache_->clear(); csiz_ = 0; double etime = time(); report(_KCCODELINE_, "flushing the cache finished: time=%.6f", etime - stime); return !err; } /** * Merge temporary databases. * @return true on success, or false on failure. */ bool merge_tmpdbs() { _assert_(true); bool err = false; report(_KCCODELINE_, "merging the temporary databases"); double stime = time(); if (!db_.merge(tmpdbs_, dbnum_, PolyDB::MAPPEND)) err = true; dbclock_ = 0; for (size_t i = 0; i < dbnum_; i++) { BasicDB* tmpdb = tmpdbs_[i]; if (!tmpdb->clear()) { const BasicDB::Error& e = tmpdb->error(); set_error(_KCCODELINE_, e.code(), e.message()); err = true; } } double etime = time(); report(_KCCODELINE_, "merging the temporary databases finished: %.6f", etime - stime); return !err; } /** * Remove a record from databases. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @return true on success, or false on failure. */ bool clean_dbs(const char* kbuf, size_t ksiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ); if (db_.remove(kbuf, ksiz)) return true; bool err = false; if (db_.error() != BasicDB::Error::NOREC) err = true; if (tmpdbs_) { for (size_t i = 0; i < dbnum_; i++) { BasicDB* tmpdb = tmpdbs_[i]; if (!tmpdb->remove(kbuf, ksiz)) { const BasicDB::Error& e = tmpdb->error(); if (e != BasicDB::Error::NOREC) { set_error(_KCCODELINE_, e.code(), e.message()); err = true; } } } } return !err; } /** * Check whether a record exists. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @return true if the record exists, or false if not. */ bool check_impl(const char* kbuf, size_t ksiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ); char vbuf; if (db_.get(kbuf, ksiz, &vbuf, 1) >= 0) return true; if (cache_) { size_t vsiz; if (cache_->get(kbuf, ksiz, &vsiz)) return true; if (tmpdbs_) { for (size_t i = 0; i < dbnum_; i++) { BasicDB* tmpdb = tmpdbs_[i]; if (tmpdb->get(kbuf, ksiz, &vbuf, 1)) return true; } } } return false; } /** * Get the number of records. * @return the number of records, or -1 on failure. */ int64_t count_impl() { _assert_(true); int64_t dbcnt = db_.count(); if (dbcnt < 0) return -1; if (!cache_) return dbcnt; int64_t ccnt = cache_->count(); return dbcnt > ccnt ? dbcnt : ccnt; } /** * Get the size of the database file. * @return the size of the database file in bytes. */ int64_t size_impl() { _assert_(true); int64_t dbsiz = db_.size(); if (dbsiz < 0) return -1; return dbsiz + csiz_; } /** Dummy constructor to forbid the use. */ IndexDB(const IndexDB&); /** Dummy Operator to forbid the use. */ IndexDB& operator =(const IndexDB&); /** The method lock. */ RWLock mlock_; /** The internal database. */ PolyDB db_; /** The open mode. */ uint32_t omode_; /** The record comparator. */ Comparator* rcomp_; /** The base path of temporary databases. */ std::string tmppath_; /** The temporary databases. */ BasicDB** tmpdbs_; /** The number of temporary databases. */ size_t dbnum_; /** The logical clock for temporary databases. */ int64_t dbclock_; /** The internal cache. */ TinyHashMap* cache_; /** The current size of the internal cache. */ int64_t csiz_; /** The limit size of the internal cache. */ int64_t clim_; }; } // common namespace #endif // duplication check // END OF FILE