/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ #include "memcached.h" #ifdef EXTSTORE #include "storage.h" #include #include #include #include #define PAGE_BUCKET_DEFAULT 0 #define PAGE_BUCKET_COMPACT 1 #define PAGE_BUCKET_CHUNKED 2 #define PAGE_BUCKET_LOWTTL 3 /*** WRITE FLUSH THREAD ***/ static int storage_write(void *storage, const int clsid, const int item_age) { int did_moves = 0; struct lru_pull_tail_return it_info; it_info.it = NULL; lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info); /* Item is locked, and we have a reference to it. */ if (it_info.it == NULL) { return did_moves; } obj_io io; item *it = it_info.it; /* First, storage for the header object */ size_t orig_ntotal = ITEM_ntotal(it); uint32_t flags; if ((it->it_flags & ITEM_HDR) == 0 && (item_age == 0 || current_time - it->time > item_age)) { FLAGS_CONV(it, flags); item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr)); /* Run the storage write understanding the start of the item is dirty. * We will fill it (time/exptime/etc) from the header item on read. */ if (hdr_it != NULL) { int bucket = (it->it_flags & ITEM_CHUNKED) ? PAGE_BUCKET_CHUNKED : PAGE_BUCKET_DEFAULT; // Compress soon to expire items into similar pages. if (it->exptime - current_time < settings.ext_low_ttl) { bucket = PAGE_BUCKET_LOWTTL; } hdr_it->it_flags |= ITEM_HDR; io.len = orig_ntotal; io.mode = OBJ_IO_WRITE; // NOTE: when the item is read back in, the slab mover // may see it. Important to have refcount>=2 or ~ITEM_LINKED assert(it->refcount >= 2); // NOTE: write bucket vs free page bucket will disambiguate once // lowttl feature is better understood. if (extstore_write_request(storage, bucket, bucket, &io) == 0) { // cuddle the hash value into the time field so we don't have // to recalculate it. item *buf_it = (item *) io.buf; buf_it->time = it_info.hv; // copy from past the headers + time headers. // TODO: should be in items.c if (it->it_flags & ITEM_CHUNKED) { // Need to loop through the item and copy item_chunk *sch = (item_chunk *) ITEM_schunk(it); int remain = orig_ntotal; int copied = 0; // copy original header int hdrtotal = ITEM_ntotal(it) - it->nbytes; memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, hdrtotal - STORE_OFFSET); copied = hdrtotal; // copy data in like it were one large object. while (sch && remain) { assert(remain >= sch->used); memcpy((char *)io.buf+copied, sch->data, sch->used); // FIXME: use one variable? remain -= sch->used; copied += sch->used; sch = sch->next; } } else { memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, io.len-STORE_OFFSET); } // crc what we copied so we can do it sequentially. buf_it->it_flags &= ~ITEM_LINKED; buf_it->exptime = crc32c(0, (char*)io.buf+STORE_OFFSET, orig_ntotal-STORE_OFFSET); extstore_write(storage, &io); item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it); hdr->page_version = io.page_version; hdr->page_id = io.page_id; hdr->offset = io.offset; // overload nbytes for the header it hdr_it->nbytes = it->nbytes; /* success! Now we need to fill relevant data into the new * header and replace. Most of this requires the item lock */ /* CAS gets set while linking. Copy post-replace */ item_replace(it, hdr_it, it_info.hv); ITEM_set_cas(hdr_it, ITEM_get_cas(it)); do_item_remove(hdr_it); did_moves = 1; LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EXTSTORE_WRITE, it, bucket); } else { /* Failed to write for some reason, can't continue. */ slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it)); } } } do_item_remove(it); item_unlock(it_info.hv); return did_moves; } static pthread_t storage_write_tid; static pthread_mutex_t storage_write_plock; #define WRITE_SLEEP_MAX 1000000 #define WRITE_SLEEP_MIN 500 static void *storage_write_thread(void *arg) { void *storage = arg; // NOTE: ignoring overflow since that would take years of uptime in a // specific load pattern of never going to sleep. unsigned int backoff[MAX_NUMBER_OF_SLAB_CLASSES] = {0}; unsigned int counter = 0; useconds_t to_sleep = WRITE_SLEEP_MIN; logger *l = logger_create(); if (l == NULL) { fprintf(stderr, "Failed to allocate logger for storage compaction thread\n"); abort(); } pthread_mutex_lock(&storage_write_plock); while (1) { // cache per-loop to avoid calls to the slabs_clsid() search loop int min_class = slabs_clsid(settings.ext_item_size); bool do_sleep = true; counter++; if (to_sleep > WRITE_SLEEP_MAX) to_sleep = WRITE_SLEEP_MAX; for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) { bool did_move = false; bool mem_limit_reached = false; unsigned int chunks_free; int item_age; int target = settings.ext_free_memchunks[x]; if (min_class > x || (backoff[x] && (counter % backoff[x] != 0))) { // Long sleeps means we should retry classes sooner. if (to_sleep > WRITE_SLEEP_MIN * 10) backoff[x] /= 2; continue; } // Avoid extra slab lock calls during heavy writing. chunks_free = slabs_available_chunks(x, &mem_limit_reached, NULL); // storage_write() will fail and cut loop after filling write buffer. while (1) { // if we are low on chunks and no spare, push out early. if (chunks_free < target && mem_limit_reached) { item_age = 0; } else { item_age = settings.ext_item_age; } if (storage_write(storage, x, item_age)) { chunks_free++; // Allow stopping if we've done enough this loop did_move = true; do_sleep = false; if (to_sleep > WRITE_SLEEP_MIN) to_sleep /= 2; } else { break; } } if (!did_move) { backoff[x]++; } else if (backoff[x]) { backoff[x] /= 2; } } // flip lock so we can be paused or stopped pthread_mutex_unlock(&storage_write_plock); if (do_sleep) { usleep(to_sleep); to_sleep *= 2; } pthread_mutex_lock(&storage_write_plock); } return NULL; } // TODO // logger needs logger_destroy() to exist/work before this is safe. /*int stop_storage_write_thread(void) { int ret; pthread_mutex_lock(&lru_maintainer_lock); do_run_lru_maintainer_thread = 0; pthread_mutex_unlock(&lru_maintainer_lock); // WAKEUP SIGNAL if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) { fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret)); return -1; } settings.lru_maintainer_thread = false; return 0; }*/ void storage_write_pause(void) { pthread_mutex_lock(&storage_write_plock); } void storage_write_resume(void) { pthread_mutex_unlock(&storage_write_plock); } int start_storage_write_thread(void *arg) { int ret; pthread_mutex_init(&storage_write_plock, NULL); if ((ret = pthread_create(&storage_write_tid, NULL, storage_write_thread, arg)) != 0) { fprintf(stderr, "Can't create storage_write thread: %s\n", strerror(ret)); return -1; } return 0; } /*** COMPACTOR ***/ /* Fetch stats from the external storage system and decide to compact. * If we're more than half full, start skewing how aggressively to run * compaction, up to a desired target when all pages are full. */ static int storage_compact_check(void *storage, logger *l, uint32_t *page_id, uint64_t *page_version, uint64_t *page_size, bool *drop_unread) { struct extstore_stats st; int x; double rate; uint64_t frag_limit; uint64_t low_version = ULLONG_MAX; uint64_t lowest_version = ULLONG_MAX; unsigned int low_page = 0; unsigned int lowest_page = 0; extstore_get_stats(storage, &st); if (st.pages_used == 0) return 0; // lets pick a target "wasted" value and slew. if (st.pages_free > settings.ext_compact_under) return 0; *drop_unread = false; // the number of free pages reduces the configured frag limit // this allows us to defrag early if pages are very empty. rate = 1.0 - ((double)st.pages_free / st.page_count); rate *= settings.ext_max_frag; frag_limit = st.page_size * rate; LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_FRAGINFO, NULL, rate, frag_limit); st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data)); extstore_get_page_data(storage, &st); // find oldest page by version that violates the constraint for (x = 0; x < st.page_count; x++) { if (st.page_data[x].version == 0 || st.page_data[x].bucket == PAGE_BUCKET_LOWTTL) continue; if (st.page_data[x].version < lowest_version) { lowest_page = x; lowest_version = st.page_data[x].version; } if (st.page_data[x].bytes_used < frag_limit) { if (st.page_data[x].version < low_version) { low_page = x; low_version = st.page_data[x].version; } } } *page_size = st.page_size; free(st.page_data); // we have a page + version to attempt to reclaim. if (low_version != ULLONG_MAX) { *page_id = low_page; *page_version = low_version; return 1; } else if (lowest_version != ULLONG_MAX && settings.ext_drop_unread && st.pages_free <= settings.ext_drop_under) { // nothing matched the frag rate barrier, so pick the absolute oldest // version if we're configured to drop items. *page_id = lowest_page; *page_version = lowest_version; *drop_unread = true; return 1; } return 0; } static pthread_t storage_compact_tid; static pthread_mutex_t storage_compact_plock; #define MIN_STORAGE_COMPACT_SLEEP 10000 #define MAX_STORAGE_COMPACT_SLEEP 2000000 struct storage_compact_wrap { obj_io io; pthread_mutex_t lock; // gates the bools. bool done; bool submitted; bool miss; // version flipped out from under us }; static void storage_compact_readback(void *storage, logger *l, bool drop_unread, char *readback_buf, uint32_t page_id, uint64_t page_version, uint64_t read_size) { uint64_t offset = 0; unsigned int rescues = 0; unsigned int lost = 0; unsigned int skipped = 0; while (offset < read_size) { item *hdr_it = NULL; item_hdr *hdr = NULL; item *it = (item *)(readback_buf+offset); unsigned int ntotal; // probably zeroed out junk at the end of the wbuf if (it->nkey == 0) { break; } ntotal = ITEM_ntotal(it); uint32_t hv = (uint32_t)it->time; item_lock(hv); // We don't have a conn and don't need to do most of do_item_get hdr_it = assoc_find(ITEM_key(it), it->nkey, hv); if (hdr_it != NULL) { bool do_write = false; refcount_incr(hdr_it); // Check validity but don't bother removing it. if ((hdr_it->it_flags & ITEM_HDR) && !item_is_flushed(hdr_it) && (hdr_it->exptime == 0 || hdr_it->exptime > current_time)) { hdr = (item_hdr *)ITEM_data(hdr_it); if (hdr->page_id == page_id && hdr->page_version == page_version) { // Item header is still completely valid. extstore_delete(storage, page_id, page_version, 1, ntotal); // drop inactive items. if (drop_unread && GET_LRU(hdr_it->slabs_clsid) == COLD_LRU) { do_write = false; skipped++; } else { do_write = true; } } } if (do_write) { bool do_update = false; int tries; obj_io io; io.len = ntotal; io.mode = OBJ_IO_WRITE; for (tries = 10; tries > 0; tries--) { if (extstore_write_request(storage, PAGE_BUCKET_COMPACT, PAGE_BUCKET_COMPACT, &io) == 0) { memcpy(io.buf, it, io.len); extstore_write(storage, &io); do_update = true; break; } else { usleep(1000); } } if (do_update) { if (it->refcount == 2) { hdr->page_version = io.page_version; hdr->page_id = io.page_id; hdr->offset = io.offset; rescues++; } else { lost++; // TODO: re-alloc and replace header. } } else { lost++; } } do_item_remove(hdr_it); } item_unlock(hv); offset += ntotal; if (read_size - offset < sizeof(struct _stritem)) break; } STATS_LOCK(); stats.extstore_compact_lost += lost; stats.extstore_compact_rescues += rescues; stats.extstore_compact_skipped += skipped; STATS_UNLOCK(); LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_END, NULL, page_id, offset, rescues, lost, skipped); } static void _storage_compact_cb(void *e, obj_io *io, int ret) { struct storage_compact_wrap *wrap = (struct storage_compact_wrap *)io->data; assert(wrap->submitted == true); pthread_mutex_lock(&wrap->lock); if (ret < 1) { wrap->miss = true; } wrap->done = true; pthread_mutex_unlock(&wrap->lock); } // TODO: hoist the storage bits from lru_maintainer_thread in here. // would be nice if they could avoid hammering the same locks though? // I guess it's only COLD. that's probably fine. static void *storage_compact_thread(void *arg) { void *storage = arg; useconds_t to_sleep = MAX_STORAGE_COMPACT_SLEEP; bool compacting = false; uint64_t page_version = 0; uint64_t page_size = 0; uint64_t page_offset = 0; uint32_t page_id = 0; bool drop_unread = false; char *readback_buf = NULL; struct storage_compact_wrap wrap; logger *l = logger_create(); if (l == NULL) { fprintf(stderr, "Failed to allocate logger for storage compaction thread\n"); abort(); } readback_buf = malloc(settings.ext_wbuf_size); if (readback_buf == NULL) { fprintf(stderr, "Failed to allocate readback buffer for storage compaction thread\n"); abort(); } pthread_mutex_init(&wrap.lock, NULL); wrap.done = false; wrap.submitted = false; wrap.io.data = &wrap; wrap.io.buf = (void *)readback_buf; wrap.io.len = settings.ext_wbuf_size; wrap.io.mode = OBJ_IO_READ; wrap.io.cb = _storage_compact_cb; pthread_mutex_lock(&storage_compact_plock); while (1) { pthread_mutex_unlock(&storage_compact_plock); if (to_sleep) { extstore_run_maint(storage); usleep(to_sleep); } pthread_mutex_lock(&storage_compact_plock); if (!compacting && storage_compact_check(storage, l, &page_id, &page_version, &page_size, &drop_unread)) { page_offset = 0; compacting = true; LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_START, NULL, page_id, page_version); } if (compacting) { pthread_mutex_lock(&wrap.lock); if (page_offset < page_size && !wrap.done && !wrap.submitted) { wrap.io.page_version = page_version; wrap.io.page_id = page_id; wrap.io.offset = page_offset; // FIXME: should be smarter about io->next (unlink at use?) wrap.io.next = NULL; wrap.submitted = true; wrap.miss = false; extstore_submit(storage, &wrap.io); } else if (wrap.miss) { LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_ABORT, NULL, page_id); wrap.done = false; wrap.submitted = false; compacting = false; } else if (wrap.submitted && wrap.done) { LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_START, NULL, page_id, page_offset); storage_compact_readback(storage, l, drop_unread, readback_buf, page_id, page_version, settings.ext_wbuf_size); page_offset += settings.ext_wbuf_size; wrap.done = false; wrap.submitted = false; } else if (page_offset >= page_size) { compacting = false; wrap.done = false; wrap.submitted = false; extstore_close_page(storage, page_id, page_version); LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_END, NULL, page_id); } pthread_mutex_unlock(&wrap.lock); if (to_sleep > MIN_STORAGE_COMPACT_SLEEP) to_sleep /= 2; } else { if (to_sleep < MAX_STORAGE_COMPACT_SLEEP) to_sleep += MIN_STORAGE_COMPACT_SLEEP; } } free(readback_buf); return NULL; } // TODO // logger needs logger_destroy() to exist/work before this is safe. /*int stop_storage_compact_thread(void) { int ret; pthread_mutex_lock(&lru_maintainer_lock); do_run_lru_maintainer_thread = 0; pthread_mutex_unlock(&lru_maintainer_lock); if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) { fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret)); return -1; } settings.lru_maintainer_thread = false; return 0; }*/ void storage_compact_pause(void) { pthread_mutex_lock(&storage_compact_plock); } void storage_compact_resume(void) { pthread_mutex_unlock(&storage_compact_plock); } int start_storage_compact_thread(void *arg) { int ret; pthread_mutex_init(&storage_compact_plock, NULL); if ((ret = pthread_create(&storage_compact_tid, NULL, storage_compact_thread, arg)) != 0) { fprintf(stderr, "Can't create storage_compact thread: %s\n", strerror(ret)); return -1; } return 0; } /*** UTILITY ***/ // /path/to/file:100G:bucket1 // FIXME: Modifies argument. copy instead? struct extstore_conf_file *storage_conf_parse(char *arg, unsigned int page_size) { struct extstore_conf_file *cf = NULL; char *b = NULL; char *p = strtok_r(arg, ":", &b); char unit = 0; uint64_t multiplier = 0; int base_size = 0; if (p == NULL) goto error; // First arg is the filepath. cf = calloc(1, sizeof(struct extstore_conf_file)); cf->file = strdup(p); p = strtok_r(NULL, ":", &b); if (p == NULL) { fprintf(stderr, "must supply size to ext_path, ie: ext_path=/f/e:64m (M|G|T|P supported)\n"); goto error; } unit = tolower(p[strlen(p)-1]); p[strlen(p)-1] = '\0'; // sigh. switch (unit) { case 'm': multiplier = 1024 * 1024; break; case 'g': multiplier = 1024 * 1024 * 1024; break; case 't': multiplier = 1024 * 1024; multiplier *= 1024 * 1024; break; case 'p': multiplier = 1024 * 1024; multiplier *= 1024 * 1024 * 1024; break; } base_size = atoi(p); multiplier *= base_size; // page_count is nearest-but-not-larger-than pages * psize cf->page_count = multiplier / page_size; assert(page_size * cf->page_count <= multiplier); // final token would be a default free bucket p = strtok_r(NULL, ",", &b); // TODO: We reuse the original DEFINES for now, // but if lowttl gets split up this needs to be its own set. if (p != NULL) { if (strcmp(p, "compact") == 0) { cf->free_bucket = PAGE_BUCKET_COMPACT; } else if (strcmp(p, "lowttl") == 0) { cf->free_bucket = PAGE_BUCKET_LOWTTL; } else if (strcmp(p, "chunked") == 0) { cf->free_bucket = PAGE_BUCKET_CHUNKED; } else if (strcmp(p, "default") == 0) { cf->free_bucket = PAGE_BUCKET_DEFAULT; } else { fprintf(stderr, "Unknown extstore bucket: %s\n", p); goto error; } } else { // TODO: is this necessary? cf->free_bucket = PAGE_BUCKET_DEFAULT; } // TODO: disabling until compact algorithm is improved. if (cf->free_bucket != PAGE_BUCKET_DEFAULT) { fprintf(stderr, "ext_path only presently supports the default bucket\n"); goto error; } return cf; error: if (cf) { if (cf->file) free(cf->file); free(cf); } return NULL; } #endif