Blame storage.c

Packit 4e8bc4
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
Packit 4e8bc4
#include "memcached.h"
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
Packit 4e8bc4
#include "storage.h"
Packit 4e8bc4
#include <stdlib.h>
Packit 4e8bc4
#include <string.h>
Packit 4e8bc4
#include <limits.h>
Packit 4e8bc4
#include <ctype.h>
Packit 4e8bc4
Packit 4e8bc4
#define PAGE_BUCKET_DEFAULT 0
Packit 4e8bc4
#define PAGE_BUCKET_COMPACT 1
Packit 4e8bc4
#define PAGE_BUCKET_CHUNKED 2
Packit 4e8bc4
#define PAGE_BUCKET_LOWTTL  3
Packit 4e8bc4
Packit 4e8bc4
/*** WRITE FLUSH THREAD ***/
Packit 4e8bc4
Packit 4e8bc4
static int storage_write(void *storage, const int clsid, const int item_age) {
Packit 4e8bc4
    int did_moves = 0;
Packit 4e8bc4
    struct lru_pull_tail_return it_info;
Packit 4e8bc4
Packit 4e8bc4
    it_info.it = NULL;
Packit 4e8bc4
    lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info);
Packit 4e8bc4
    /* Item is locked, and we have a reference to it. */
Packit 4e8bc4
    if (it_info.it == NULL) {
Packit 4e8bc4
        return did_moves;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    obj_io io;
Packit 4e8bc4
    item *it = it_info.it;
Packit 4e8bc4
    /* First, storage for the header object */
Packit 4e8bc4
    size_t orig_ntotal = ITEM_ntotal(it);
Packit 4e8bc4
    uint32_t flags;
Packit 4e8bc4
    if ((it->it_flags & ITEM_HDR) == 0 &&
Packit 4e8bc4
            (item_age == 0 || current_time - it->time > item_age)) {
Packit 4e8bc4
        FLAGS_CONV(it, flags);
Packit 4e8bc4
        item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr));
Packit 4e8bc4
        /* Run the storage write understanding the start of the item is dirty.
Packit 4e8bc4
         * We will fill it (time/exptime/etc) from the header item on read.
Packit 4e8bc4
         */
Packit 4e8bc4
        if (hdr_it != NULL) {
Packit 4e8bc4
            int bucket = (it->it_flags & ITEM_CHUNKED) ?
Packit 4e8bc4
                PAGE_BUCKET_CHUNKED : PAGE_BUCKET_DEFAULT;
Packit 4e8bc4
            // Compress soon to expire items into similar pages.
Packit 4e8bc4
            if (it->exptime - current_time < settings.ext_low_ttl) {
Packit 4e8bc4
                bucket = PAGE_BUCKET_LOWTTL;
Packit 4e8bc4
            }
Packit 4e8bc4
            hdr_it->it_flags |= ITEM_HDR;
Packit 4e8bc4
            io.len = orig_ntotal;
Packit 4e8bc4
            io.mode = OBJ_IO_WRITE;
Packit 4e8bc4
            // NOTE: when the item is read back in, the slab mover
Packit 4e8bc4
            // may see it. Important to have refcount>=2 or ~ITEM_LINKED
Packit 4e8bc4
            assert(it->refcount >= 2);
Packit 4e8bc4
            // NOTE: write bucket vs free page bucket will disambiguate once
Packit 4e8bc4
            // lowttl feature is better understood.
Packit 4e8bc4
            if (extstore_write_request(storage, bucket, bucket, &io) == 0) {
Packit 4e8bc4
                // cuddle the hash value into the time field so we don't have
Packit 4e8bc4
                // to recalculate it.
Packit 4e8bc4
                item *buf_it = (item *) io.buf;
Packit 4e8bc4
                buf_it->time = it_info.hv;
Packit 4e8bc4
                // copy from past the headers + time headers.
Packit 4e8bc4
                // TODO: should be in items.c
Packit 4e8bc4
                if (it->it_flags & ITEM_CHUNKED) {
Packit 4e8bc4
                    // Need to loop through the item and copy
Packit 4e8bc4
                    item_chunk *sch = (item_chunk *) ITEM_schunk(it);
Packit 4e8bc4
                    int remain = orig_ntotal;
Packit 4e8bc4
                    int copied = 0;
Packit 4e8bc4
                    // copy original header
Packit 4e8bc4
                    int hdrtotal = ITEM_ntotal(it) - it->nbytes;
Packit 4e8bc4
                    memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, hdrtotal - STORE_OFFSET);
Packit 4e8bc4
                    copied = hdrtotal;
Packit 4e8bc4
                    // copy data in like it were one large object.
Packit 4e8bc4
                    while (sch && remain) {
Packit 4e8bc4
                        assert(remain >= sch->used);
Packit 4e8bc4
                        memcpy((char *)io.buf+copied, sch->data, sch->used);
Packit 4e8bc4
                        // FIXME: use one variable?
Packit 4e8bc4
                        remain -= sch->used;
Packit 4e8bc4
                        copied += sch->used;
Packit 4e8bc4
                        sch = sch->next;
Packit 4e8bc4
                    }
Packit 4e8bc4
                } else {
Packit 4e8bc4
                    memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, io.len-STORE_OFFSET);
Packit 4e8bc4
                }
Packit 4e8bc4
                // crc what we copied so we can do it sequentially.
Packit 4e8bc4
                buf_it->it_flags &= ~ITEM_LINKED;
Packit 4e8bc4
                buf_it->exptime = crc32c(0, (char*)io.buf+STORE_OFFSET, orig_ntotal-STORE_OFFSET);
Packit 4e8bc4
                extstore_write(storage, &io);
Packit 4e8bc4
                item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it);
Packit 4e8bc4
                hdr->page_version = io.page_version;
Packit 4e8bc4
                hdr->page_id = io.page_id;
Packit 4e8bc4
                hdr->offset  = io.offset;
Packit 4e8bc4
                // overload nbytes for the header it
Packit 4e8bc4
                hdr_it->nbytes = it->nbytes;
Packit 4e8bc4
                /* success! Now we need to fill relevant data into the new
Packit 4e8bc4
                 * header and replace. Most of this requires the item lock
Packit 4e8bc4
                 */
Packit 4e8bc4
                /* CAS gets set while linking. Copy post-replace */
Packit 4e8bc4
                item_replace(it, hdr_it, it_info.hv);
Packit 4e8bc4
                ITEM_set_cas(hdr_it, ITEM_get_cas(it));
Packit 4e8bc4
                do_item_remove(hdr_it);
Packit 4e8bc4
                did_moves = 1;
Packit 4e8bc4
                LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EXTSTORE_WRITE, it, bucket);
Packit 4e8bc4
            } else {
Packit 4e8bc4
                /* Failed to write for some reason, can't continue. */
Packit 4e8bc4
                slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it));
Packit 4e8bc4
            }
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
    do_item_remove(it);
Packit 4e8bc4
    item_unlock(it_info.hv);
Packit 4e8bc4
    return did_moves;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static pthread_t storage_write_tid;
Packit 4e8bc4
static pthread_mutex_t storage_write_plock;
Packit 4e8bc4
#define WRITE_SLEEP_MAX 1000000
Packit 4e8bc4
#define WRITE_SLEEP_MIN 500
Packit 4e8bc4
Packit 4e8bc4
static void *storage_write_thread(void *arg) {
Packit 4e8bc4
    void *storage = arg;
Packit 4e8bc4
    // NOTE: ignoring overflow since that would take years of uptime in a
Packit 4e8bc4
    // specific load pattern of never going to sleep.
Packit 4e8bc4
    unsigned int backoff[MAX_NUMBER_OF_SLAB_CLASSES] = {0};
Packit 4e8bc4
    unsigned int counter = 0;
Packit 4e8bc4
    useconds_t to_sleep = WRITE_SLEEP_MIN;
Packit 4e8bc4
    logger *l = logger_create();
Packit 4e8bc4
    if (l == NULL) {
Packit 4e8bc4
        fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
Packit 4e8bc4
        abort();
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_lock(&storage_write_plock);
Packit 4e8bc4
Packit 4e8bc4
    while (1) {
Packit 4e8bc4
        // cache per-loop to avoid calls to the slabs_clsid() search loop
Packit 4e8bc4
        int min_class = slabs_clsid(settings.ext_item_size);
Packit 4e8bc4
        bool do_sleep = true;
Packit 4e8bc4
        counter++;
Packit 4e8bc4
        if (to_sleep > WRITE_SLEEP_MAX)
Packit 4e8bc4
            to_sleep = WRITE_SLEEP_MAX;
Packit 4e8bc4
Packit 4e8bc4
        for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
Packit 4e8bc4
            bool did_move = false;
Packit 4e8bc4
            bool mem_limit_reached = false;
Packit 4e8bc4
            unsigned int chunks_free;
Packit 4e8bc4
            int item_age;
Packit 4e8bc4
            int target = settings.ext_free_memchunks[x];
Packit 4e8bc4
            if (min_class > x || (backoff[x] && (counter % backoff[x] != 0))) {
Packit 4e8bc4
                // Long sleeps means we should retry classes sooner.
Packit 4e8bc4
                if (to_sleep > WRITE_SLEEP_MIN * 10)
Packit 4e8bc4
                    backoff[x] /= 2;
Packit 4e8bc4
                continue;
Packit 4e8bc4
            }
Packit 4e8bc4
Packit 4e8bc4
            // Avoid extra slab lock calls during heavy writing.
Packit 4e8bc4
            chunks_free = slabs_available_chunks(x, &mem_limit_reached,
Packit 4e8bc4
                    NULL);
Packit 4e8bc4
Packit 4e8bc4
            // storage_write() will fail and cut loop after filling write buffer.
Packit 4e8bc4
            while (1) {
Packit 4e8bc4
                // if we are low on chunks and no spare, push out early.
Packit 4e8bc4
                if (chunks_free < target && mem_limit_reached) {
Packit 4e8bc4
                    item_age = 0;
Packit 4e8bc4
                } else {
Packit 4e8bc4
                    item_age = settings.ext_item_age;
Packit 4e8bc4
                }
Packit 4e8bc4
                if (storage_write(storage, x, item_age)) {
Packit 4e8bc4
                    chunks_free++; // Allow stopping if we've done enough this loop
Packit 4e8bc4
                    did_move = true;
Packit 4e8bc4
                    do_sleep = false;
Packit 4e8bc4
                    if (to_sleep > WRITE_SLEEP_MIN)
Packit 4e8bc4
                        to_sleep /= 2;
Packit 4e8bc4
                } else {
Packit 4e8bc4
                    break;
Packit 4e8bc4
                }
Packit 4e8bc4
            }
Packit 4e8bc4
Packit 4e8bc4
            if (!did_move) {
Packit 4e8bc4
                backoff[x]++;
Packit 4e8bc4
            } else if (backoff[x]) {
Packit 4e8bc4
                backoff[x] /= 2;
Packit 4e8bc4
            }
Packit 4e8bc4
        }
Packit 4e8bc4
Packit 4e8bc4
        // flip lock so we can be paused or stopped
Packit 4e8bc4
        pthread_mutex_unlock(&storage_write_plock);
Packit 4e8bc4
        if (do_sleep) {
Packit 4e8bc4
            usleep(to_sleep);
Packit 4e8bc4
            to_sleep *= 2;
Packit 4e8bc4
        }
Packit 4e8bc4
        pthread_mutex_lock(&storage_write_plock);
Packit 4e8bc4
    }
Packit 4e8bc4
    return NULL;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
// TODO
Packit 4e8bc4
// logger needs logger_destroy() to exist/work before this is safe.
Packit 4e8bc4
/*int stop_storage_write_thread(void) {
Packit 4e8bc4
    int ret;
Packit 4e8bc4
    pthread_mutex_lock(&lru_maintainer_lock);
Packit 4e8bc4
    do_run_lru_maintainer_thread = 0;
Packit 4e8bc4
    pthread_mutex_unlock(&lru_maintainer_lock);
Packit 4e8bc4
    // WAKEUP SIGNAL
Packit 4e8bc4
    if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
Packit 4e8bc4
        fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
    settings.lru_maintainer_thread = false;
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}*/
Packit 4e8bc4
Packit 4e8bc4
void storage_write_pause(void) {
Packit 4e8bc4
    pthread_mutex_lock(&storage_write_plock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void storage_write_resume(void) {
Packit 4e8bc4
    pthread_mutex_unlock(&storage_write_plock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
int start_storage_write_thread(void *arg) {
Packit 4e8bc4
    int ret;
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_init(&storage_write_plock, NULL);
Packit 4e8bc4
    if ((ret = pthread_create(&storage_write_tid, NULL,
Packit 4e8bc4
        storage_write_thread, arg)) != 0) {
Packit 4e8bc4
        fprintf(stderr, "Can't create storage_write thread: %s\n",
Packit 4e8bc4
            strerror(ret));
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*** COMPACTOR ***/
Packit 4e8bc4
Packit 4e8bc4
/* Fetch stats from the external storage system and decide to compact.
Packit 4e8bc4
 * If we're more than half full, start skewing how aggressively to run
Packit 4e8bc4
 * compaction, up to a desired target when all pages are full.
Packit 4e8bc4
 */
Packit 4e8bc4
static int storage_compact_check(void *storage, logger *l,
Packit 4e8bc4
        uint32_t *page_id, uint64_t *page_version,
Packit 4e8bc4
        uint64_t *page_size, bool *drop_unread) {
Packit 4e8bc4
    struct extstore_stats st;
Packit 4e8bc4
    int x;
Packit 4e8bc4
    double rate;
Packit 4e8bc4
    uint64_t frag_limit;
Packit 4e8bc4
    uint64_t low_version = ULLONG_MAX;
Packit 4e8bc4
    uint64_t lowest_version = ULLONG_MAX;
Packit 4e8bc4
    unsigned int low_page = 0;
Packit 4e8bc4
    unsigned int lowest_page = 0;
Packit 4e8bc4
    extstore_get_stats(storage, &st);
Packit 4e8bc4
    if (st.pages_used == 0)
Packit 4e8bc4
        return 0;
Packit 4e8bc4
Packit 4e8bc4
    // lets pick a target "wasted" value and slew.
Packit 4e8bc4
    if (st.pages_free > settings.ext_compact_under)
Packit 4e8bc4
        return 0;
Packit 4e8bc4
    *drop_unread = false;
Packit 4e8bc4
Packit 4e8bc4
    // the number of free pages reduces the configured frag limit
Packit 4e8bc4
    // this allows us to defrag early if pages are very empty.
Packit 4e8bc4
    rate = 1.0 - ((double)st.pages_free / st.page_count);
Packit 4e8bc4
    rate *= settings.ext_max_frag;
Packit 4e8bc4
    frag_limit = st.page_size * rate;
Packit 4e8bc4
    LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_FRAGINFO,
Packit 4e8bc4
            NULL, rate, frag_limit);
Packit 4e8bc4
    st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
Packit 4e8bc4
    extstore_get_page_data(storage, &st);
Packit 4e8bc4
Packit 4e8bc4
    // find oldest page by version that violates the constraint
Packit 4e8bc4
    for (x = 0; x < st.page_count; x++) {
Packit 4e8bc4
        if (st.page_data[x].version == 0 ||
Packit 4e8bc4
            st.page_data[x].bucket == PAGE_BUCKET_LOWTTL)
Packit 4e8bc4
            continue;
Packit 4e8bc4
        if (st.page_data[x].version < lowest_version) {
Packit 4e8bc4
            lowest_page = x;
Packit 4e8bc4
            lowest_version = st.page_data[x].version;
Packit 4e8bc4
        }
Packit 4e8bc4
        if (st.page_data[x].bytes_used < frag_limit) {
Packit 4e8bc4
            if (st.page_data[x].version < low_version) {
Packit 4e8bc4
                low_page = x;
Packit 4e8bc4
                low_version = st.page_data[x].version;
Packit 4e8bc4
            }
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
    *page_size = st.page_size;
Packit 4e8bc4
    free(st.page_data);
Packit 4e8bc4
Packit 4e8bc4
    // we have a page + version to attempt to reclaim.
Packit 4e8bc4
    if (low_version != ULLONG_MAX) {
Packit 4e8bc4
        *page_id = low_page;
Packit 4e8bc4
        *page_version = low_version;
Packit 4e8bc4
        return 1;
Packit 4e8bc4
    } else if (lowest_version != ULLONG_MAX && settings.ext_drop_unread
Packit 4e8bc4
            && st.pages_free <= settings.ext_drop_under) {
Packit 4e8bc4
        // nothing matched the frag rate barrier, so pick the absolute oldest
Packit 4e8bc4
        // version if we're configured to drop items.
Packit 4e8bc4
        *page_id = lowest_page;
Packit 4e8bc4
        *page_version = lowest_version;
Packit 4e8bc4
        *drop_unread = true;
Packit 4e8bc4
        return 1;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static pthread_t storage_compact_tid;
Packit 4e8bc4
static pthread_mutex_t storage_compact_plock;
Packit 4e8bc4
#define MIN_STORAGE_COMPACT_SLEEP 10000
Packit 4e8bc4
#define MAX_STORAGE_COMPACT_SLEEP 2000000
Packit 4e8bc4
Packit 4e8bc4
struct storage_compact_wrap {
Packit 4e8bc4
    obj_io io;
Packit 4e8bc4
    pthread_mutex_t lock; // gates the bools.
Packit 4e8bc4
    bool done;
Packit 4e8bc4
    bool submitted;
Packit 4e8bc4
    bool miss; // version flipped out from under us
Packit 4e8bc4
};
Packit 4e8bc4
Packit 4e8bc4
static void storage_compact_readback(void *storage, logger *l,
Packit 4e8bc4
        bool drop_unread, char *readback_buf,
Packit 4e8bc4
        uint32_t page_id, uint64_t page_version, uint64_t read_size) {
Packit 4e8bc4
    uint64_t offset = 0;
Packit 4e8bc4
    unsigned int rescues = 0;
Packit 4e8bc4
    unsigned int lost = 0;
Packit 4e8bc4
    unsigned int skipped = 0;
Packit 4e8bc4
Packit 4e8bc4
    while (offset < read_size) {
Packit 4e8bc4
        item *hdr_it = NULL;
Packit 4e8bc4
        item_hdr *hdr = NULL;
Packit 4e8bc4
        item *it = (item *)(readback_buf+offset);
Packit 4e8bc4
        unsigned int ntotal;
Packit 4e8bc4
        // probably zeroed out junk at the end of the wbuf
Packit 4e8bc4
        if (it->nkey == 0) {
Packit 4e8bc4
            break;
Packit 4e8bc4
        }
Packit 4e8bc4
Packit 4e8bc4
        ntotal = ITEM_ntotal(it);
Packit 4e8bc4
        uint32_t hv = (uint32_t)it->time;
Packit 4e8bc4
        item_lock(hv);
Packit 4e8bc4
        // We don't have a conn and don't need to do most of do_item_get
Packit 4e8bc4
        hdr_it = assoc_find(ITEM_key(it), it->nkey, hv);
Packit 4e8bc4
        if (hdr_it != NULL) {
Packit 4e8bc4
            bool do_write = false;
Packit 4e8bc4
            refcount_incr(hdr_it);
Packit 4e8bc4
Packit 4e8bc4
            // Check validity but don't bother removing it.
Packit 4e8bc4
            if ((hdr_it->it_flags & ITEM_HDR) && !item_is_flushed(hdr_it) &&
Packit 4e8bc4
                   (hdr_it->exptime == 0 || hdr_it->exptime > current_time)) {
Packit 4e8bc4
                hdr = (item_hdr *)ITEM_data(hdr_it);
Packit 4e8bc4
                if (hdr->page_id == page_id && hdr->page_version == page_version) {
Packit 4e8bc4
                    // Item header is still completely valid.
Packit 4e8bc4
                    extstore_delete(storage, page_id, page_version, 1, ntotal);
Packit 4e8bc4
                    // drop inactive items.
Packit 4e8bc4
                    if (drop_unread && GET_LRU(hdr_it->slabs_clsid) == COLD_LRU) {
Packit 4e8bc4
                        do_write = false;
Packit 4e8bc4
                        skipped++;
Packit 4e8bc4
                    } else {
Packit 4e8bc4
                        do_write = true;
Packit 4e8bc4
                    }
Packit 4e8bc4
                }
Packit 4e8bc4
            }
Packit 4e8bc4
Packit 4e8bc4
            if (do_write) {
Packit 4e8bc4
                bool do_update = false;
Packit 4e8bc4
                int tries;
Packit 4e8bc4
                obj_io io;
Packit 4e8bc4
                io.len = ntotal;
Packit 4e8bc4
                io.mode = OBJ_IO_WRITE;
Packit 4e8bc4
                for (tries = 10; tries > 0; tries--) {
Packit 4e8bc4
                    if (extstore_write_request(storage, PAGE_BUCKET_COMPACT, PAGE_BUCKET_COMPACT, &io) == 0) {
Packit 4e8bc4
                        memcpy(io.buf, it, io.len);
Packit 4e8bc4
                        extstore_write(storage, &io);
Packit 4e8bc4
                        do_update = true;
Packit 4e8bc4
                        break;
Packit 4e8bc4
                    } else {
Packit 4e8bc4
                        usleep(1000);
Packit 4e8bc4
                    }
Packit 4e8bc4
                }
Packit 4e8bc4
Packit 4e8bc4
                if (do_update) {
Packit 4e8bc4
                    if (it->refcount == 2) {
Packit 4e8bc4
                        hdr->page_version = io.page_version;
Packit 4e8bc4
                        hdr->page_id = io.page_id;
Packit 4e8bc4
                        hdr->offset = io.offset;
Packit 4e8bc4
                        rescues++;
Packit 4e8bc4
                    } else {
Packit 4e8bc4
                        lost++;
Packit 4e8bc4
                        // TODO: re-alloc and replace header.
Packit 4e8bc4
                    }
Packit 4e8bc4
                } else {
Packit 4e8bc4
                    lost++;
Packit 4e8bc4
                }
Packit 4e8bc4
            }
Packit 4e8bc4
Packit 4e8bc4
            do_item_remove(hdr_it);
Packit 4e8bc4
        }
Packit 4e8bc4
Packit 4e8bc4
        item_unlock(hv);
Packit 4e8bc4
        offset += ntotal;
Packit 4e8bc4
        if (read_size - offset < sizeof(struct _stritem))
Packit 4e8bc4
            break;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    STATS_LOCK();
Packit 4e8bc4
    stats.extstore_compact_lost += lost;
Packit 4e8bc4
    stats.extstore_compact_rescues += rescues;
Packit 4e8bc4
    stats.extstore_compact_skipped += skipped;
Packit 4e8bc4
    STATS_UNLOCK();
Packit 4e8bc4
    LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_END,
Packit 4e8bc4
            NULL, page_id, offset, rescues, lost, skipped);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static void _storage_compact_cb(void *e, obj_io *io, int ret) {
Packit 4e8bc4
    struct storage_compact_wrap *wrap = (struct storage_compact_wrap *)io->data;
Packit 4e8bc4
    assert(wrap->submitted == true);
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_lock(&wrap->lock);
Packit 4e8bc4
Packit 4e8bc4
    if (ret < 1) {
Packit 4e8bc4
        wrap->miss = true;
Packit 4e8bc4
    }
Packit 4e8bc4
    wrap->done = true;
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_unlock(&wrap->lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
// TODO: hoist the storage bits from lru_maintainer_thread in here.
Packit 4e8bc4
// would be nice if they could avoid hammering the same locks though?
Packit 4e8bc4
// I guess it's only COLD. that's probably fine.
Packit 4e8bc4
static void *storage_compact_thread(void *arg) {
Packit 4e8bc4
    void *storage = arg;
Packit 4e8bc4
    useconds_t to_sleep = MAX_STORAGE_COMPACT_SLEEP;
Packit 4e8bc4
    bool compacting = false;
Packit 4e8bc4
    uint64_t page_version = 0;
Packit 4e8bc4
    uint64_t page_size = 0;
Packit 4e8bc4
    uint64_t page_offset = 0;
Packit 4e8bc4
    uint32_t page_id = 0;
Packit 4e8bc4
    bool drop_unread = false;
Packit 4e8bc4
    char *readback_buf = NULL;
Packit 4e8bc4
    struct storage_compact_wrap wrap;
Packit 4e8bc4
Packit 4e8bc4
    logger *l = logger_create();
Packit 4e8bc4
    if (l == NULL) {
Packit 4e8bc4
        fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
Packit 4e8bc4
        abort();
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    readback_buf = malloc(settings.ext_wbuf_size);
Packit 4e8bc4
    if (readback_buf == NULL) {
Packit 4e8bc4
        fprintf(stderr, "Failed to allocate readback buffer for storage compaction thread\n");
Packit 4e8bc4
        abort();
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_init(&wrap.lock, NULL);
Packit 4e8bc4
    wrap.done = false;
Packit 4e8bc4
    wrap.submitted = false;
Packit 4e8bc4
    wrap.io.data = &wra;;
Packit 4e8bc4
    wrap.io.buf = (void *)readback_buf;
Packit 4e8bc4
Packit 4e8bc4
    wrap.io.len = settings.ext_wbuf_size;
Packit 4e8bc4
    wrap.io.mode = OBJ_IO_READ;
Packit 4e8bc4
    wrap.io.cb = _storage_compact_cb;
Packit 4e8bc4
    pthread_mutex_lock(&storage_compact_plock);
Packit 4e8bc4
Packit 4e8bc4
    while (1) {
Packit 4e8bc4
        pthread_mutex_unlock(&storage_compact_plock);
Packit 4e8bc4
        if (to_sleep) {
Packit 4e8bc4
            extstore_run_maint(storage);
Packit 4e8bc4
            usleep(to_sleep);
Packit 4e8bc4
        }
Packit 4e8bc4
        pthread_mutex_lock(&storage_compact_plock);
Packit 4e8bc4
Packit 4e8bc4
        if (!compacting && storage_compact_check(storage, l,
Packit 4e8bc4
                    &page_id, &page_version, &page_size, &drop_unread)) {
Packit 4e8bc4
            page_offset = 0;
Packit 4e8bc4
            compacting = true;
Packit 4e8bc4
            LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_START,
Packit 4e8bc4
                    NULL, page_id, page_version);
Packit 4e8bc4
        }
Packit 4e8bc4
Packit 4e8bc4
        if (compacting) {
Packit 4e8bc4
            pthread_mutex_lock(&wrap.lock);
Packit 4e8bc4
            if (page_offset < page_size && !wrap.done && !wrap.submitted) {
Packit 4e8bc4
                wrap.io.page_version = page_version;
Packit 4e8bc4
                wrap.io.page_id = page_id;
Packit 4e8bc4
                wrap.io.offset = page_offset;
Packit 4e8bc4
                // FIXME: should be smarter about io->next (unlink at use?)
Packit 4e8bc4
                wrap.io.next = NULL;
Packit 4e8bc4
                wrap.submitted = true;
Packit 4e8bc4
                wrap.miss = false;
Packit 4e8bc4
Packit 4e8bc4
                extstore_submit(storage, &wrap.io);
Packit 4e8bc4
            } else if (wrap.miss) {
Packit 4e8bc4
                LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_ABORT,
Packit 4e8bc4
                        NULL, page_id);
Packit 4e8bc4
                wrap.done = false;
Packit 4e8bc4
                wrap.submitted = false;
Packit 4e8bc4
                compacting = false;
Packit 4e8bc4
            } else if (wrap.submitted && wrap.done) {
Packit 4e8bc4
                LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_START,
Packit 4e8bc4
                        NULL, page_id, page_offset);
Packit 4e8bc4
                storage_compact_readback(storage, l, drop_unread,
Packit 4e8bc4
                        readback_buf, page_id, page_version, settings.ext_wbuf_size);
Packit 4e8bc4
                page_offset += settings.ext_wbuf_size;
Packit 4e8bc4
                wrap.done = false;
Packit 4e8bc4
                wrap.submitted = false;
Packit 4e8bc4
            } else if (page_offset >= page_size) {
Packit 4e8bc4
                compacting = false;
Packit 4e8bc4
                wrap.done = false;
Packit 4e8bc4
                wrap.submitted = false;
Packit 4e8bc4
                extstore_close_page(storage, page_id, page_version);
Packit 4e8bc4
                LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_END,
Packit 4e8bc4
                        NULL, page_id);
Packit 4e8bc4
            }
Packit 4e8bc4
            pthread_mutex_unlock(&wrap.lock);
Packit 4e8bc4
Packit 4e8bc4
            if (to_sleep > MIN_STORAGE_COMPACT_SLEEP)
Packit 4e8bc4
                to_sleep /= 2;
Packit 4e8bc4
        } else {
Packit 4e8bc4
            if (to_sleep < MAX_STORAGE_COMPACT_SLEEP)
Packit 4e8bc4
                to_sleep += MIN_STORAGE_COMPACT_SLEEP;
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
    free(readback_buf);
Packit 4e8bc4
Packit 4e8bc4
    return NULL;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
// TODO
Packit 4e8bc4
// logger needs logger_destroy() to exist/work before this is safe.
Packit 4e8bc4
/*int stop_storage_compact_thread(void) {
Packit 4e8bc4
    int ret;
Packit 4e8bc4
    pthread_mutex_lock(&lru_maintainer_lock);
Packit 4e8bc4
    do_run_lru_maintainer_thread = 0;
Packit 4e8bc4
    pthread_mutex_unlock(&lru_maintainer_lock);
Packit 4e8bc4
    if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
Packit 4e8bc4
        fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
    settings.lru_maintainer_thread = false;
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}*/
Packit 4e8bc4
Packit 4e8bc4
void storage_compact_pause(void) {
Packit 4e8bc4
    pthread_mutex_lock(&storage_compact_plock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void storage_compact_resume(void) {
Packit 4e8bc4
    pthread_mutex_unlock(&storage_compact_plock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
int start_storage_compact_thread(void *arg) {
Packit 4e8bc4
    int ret;
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_init(&storage_compact_plock, NULL);
Packit 4e8bc4
    if ((ret = pthread_create(&storage_compact_tid, NULL,
Packit 4e8bc4
        storage_compact_thread, arg)) != 0) {
Packit 4e8bc4
        fprintf(stderr, "Can't create storage_compact thread: %s\n",
Packit 4e8bc4
            strerror(ret));
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*** UTILITY ***/
Packit 4e8bc4
// /path/to/file:100G:bucket1
Packit 4e8bc4
// FIXME: Modifies argument. copy instead?
Packit 4e8bc4
struct extstore_conf_file *storage_conf_parse(char *arg, unsigned int page_size) {
Packit 4e8bc4
    struct extstore_conf_file *cf = NULL;
Packit 4e8bc4
    char *b = NULL;
Packit 4e8bc4
    char *p = strtok_r(arg, ":", &b);
Packit 4e8bc4
    char unit = 0;
Packit 4e8bc4
    uint64_t multiplier = 0;
Packit 4e8bc4
    int base_size = 0;
Packit 4e8bc4
    if (p == NULL)
Packit 4e8bc4
        goto error;
Packit 4e8bc4
    // First arg is the filepath.
Packit 4e8bc4
    cf = calloc(1, sizeof(struct extstore_conf_file));
Packit 4e8bc4
    cf->file = strdup(p);
Packit 4e8bc4
Packit 4e8bc4
    p = strtok_r(NULL, ":", &b);
Packit 4e8bc4
    if (p == NULL) {
Packit 4e8bc4
        fprintf(stderr, "must supply size to ext_path, ie: ext_path=/f/e:64m (M|G|T|P supported)\n");
Packit 4e8bc4
        goto error;
Packit 4e8bc4
    }
Packit 4e8bc4
    unit = tolower(p[strlen(p)-1]);
Packit 4e8bc4
    p[strlen(p)-1] = '\0';
Packit 4e8bc4
    // sigh.
Packit 4e8bc4
    switch (unit) {
Packit 4e8bc4
        case 'm':
Packit 4e8bc4
            multiplier = 1024 * 1024;
Packit 4e8bc4
            break;
Packit 4e8bc4
        case 'g':
Packit 4e8bc4
            multiplier = 1024 * 1024 * 1024;
Packit 4e8bc4
            break;
Packit 4e8bc4
        case 't':
Packit 4e8bc4
            multiplier = 1024 * 1024;
Packit 4e8bc4
            multiplier *= 1024 * 1024;
Packit 4e8bc4
            break;
Packit 4e8bc4
        case 'p':
Packit 4e8bc4
            multiplier = 1024 * 1024;
Packit 4e8bc4
            multiplier *= 1024 * 1024 * 1024;
Packit 4e8bc4
            break;
Packit 4e8bc4
    }
Packit 4e8bc4
    base_size = atoi(p);
Packit 4e8bc4
    multiplier *= base_size;
Packit 4e8bc4
    // page_count is nearest-but-not-larger-than pages * psize
Packit 4e8bc4
    cf->page_count = multiplier / page_size;
Packit 4e8bc4
    assert(page_size * cf->page_count <= multiplier);
Packit 4e8bc4
Packit 4e8bc4
    // final token would be a default free bucket
Packit 4e8bc4
    p = strtok_r(NULL, ",", &b);
Packit 4e8bc4
    // TODO: We reuse the original DEFINES for now,
Packit 4e8bc4
    // but if lowttl gets split up this needs to be its own set.
Packit 4e8bc4
    if (p != NULL) {
Packit 4e8bc4
        if (strcmp(p, "compact") == 0) {
Packit 4e8bc4
            cf->free_bucket = PAGE_BUCKET_COMPACT;
Packit 4e8bc4
        } else if (strcmp(p, "lowttl") == 0) {
Packit 4e8bc4
            cf->free_bucket = PAGE_BUCKET_LOWTTL;
Packit 4e8bc4
        } else if (strcmp(p, "chunked") == 0) {
Packit 4e8bc4
            cf->free_bucket = PAGE_BUCKET_CHUNKED;
Packit 4e8bc4
        } else if (strcmp(p, "default") == 0) {
Packit 4e8bc4
            cf->free_bucket = PAGE_BUCKET_DEFAULT;
Packit 4e8bc4
        } else {
Packit 4e8bc4
            fprintf(stderr, "Unknown extstore bucket: %s\n", p);
Packit 4e8bc4
            goto error;
Packit 4e8bc4
        }
Packit 4e8bc4
    } else {
Packit 4e8bc4
        // TODO: is this necessary?
Packit 4e8bc4
        cf->free_bucket = PAGE_BUCKET_DEFAULT;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    // TODO: disabling until compact algorithm is improved.
Packit 4e8bc4
    if (cf->free_bucket != PAGE_BUCKET_DEFAULT) {
Packit 4e8bc4
        fprintf(stderr, "ext_path only presently supports the default bucket\n");
Packit 4e8bc4
        goto error;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    return cf;
Packit 4e8bc4
error:
Packit 4e8bc4
    if (cf) {
Packit 4e8bc4
        if (cf->file)
Packit 4e8bc4
            free(cf->file);
Packit 4e8bc4
        free(cf);
Packit 4e8bc4
    }
Packit 4e8bc4
    return NULL;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
#endif