Blame crawler.c

Packit 4e8bc4
/*  Copyright 2016 Netflix.
Packit 4e8bc4
 *
Packit 4e8bc4
 *  Use and distribution licensed under the BSD license.  See
Packit 4e8bc4
 *  the LICENSE file for full text.
Packit 4e8bc4
 */
Packit 4e8bc4
Packit 4e8bc4
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
Packit 4e8bc4
#include "memcached.h"
Packit 4e8bc4
#include <sys/stat.h>
Packit 4e8bc4
#include <sys/socket.h>
Packit 4e8bc4
#include <sys/resource.h>
Packit 4e8bc4
#include <fcntl.h>
Packit 4e8bc4
#include <netinet/in.h>
Packit 4e8bc4
#include <errno.h>
Packit 4e8bc4
#include <stdlib.h>
Packit 4e8bc4
#include <stdio.h>
Packit 4e8bc4
#include <signal.h>
Packit 4e8bc4
#include <string.h>
Packit 4e8bc4
#include <time.h>
Packit 4e8bc4
#include <assert.h>
Packit 4e8bc4
#include <unistd.h>
Packit 4e8bc4
#include <poll.h>
Packit 4e8bc4
Packit 4e8bc4
#define LARGEST_ID POWER_LARGEST
Packit 4e8bc4
Packit 4e8bc4
typedef struct {
Packit 4e8bc4
    void *c; /* original connection structure. still with source thread attached. */
Packit 4e8bc4
    int sfd; /* client fd. */
Packit 4e8bc4
    bipbuf_t *buf; /* output buffer */
Packit 4e8bc4
    char *cbuf; /* current buffer */
Packit 4e8bc4
} crawler_client_t;
Packit 4e8bc4
Packit 4e8bc4
typedef struct _crawler_module_t crawler_module_t;
Packit 4e8bc4
Packit 4e8bc4
typedef void (*crawler_eval_func)(crawler_module_t *cm, item *it, uint32_t hv, int slab_cls);
Packit 4e8bc4
typedef int (*crawler_init_func)(crawler_module_t *cm, void *data); // TODO: init args?
Packit 4e8bc4
typedef void (*crawler_deinit_func)(crawler_module_t *cm); // TODO: extra args?
Packit 4e8bc4
typedef void (*crawler_doneclass_func)(crawler_module_t *cm, int slab_cls);
Packit 4e8bc4
typedef void (*crawler_finalize_func)(crawler_module_t *cm);
Packit 4e8bc4
Packit 4e8bc4
typedef struct {
Packit 4e8bc4
    crawler_init_func init; /* run before crawl starts */
Packit 4e8bc4
    crawler_eval_func eval; /* runs on an item. */
Packit 4e8bc4
    crawler_doneclass_func doneclass; /* runs once per sub-crawler completion. */
Packit 4e8bc4
    crawler_finalize_func finalize; /* runs once when all sub-crawlers are done. */
Packit 4e8bc4
    bool needs_lock; /* whether or not we need the LRU lock held when eval is called */
Packit 4e8bc4
    bool needs_client; /* whether or not to grab onto the remote client */
Packit 4e8bc4
} crawler_module_reg_t;
Packit 4e8bc4
Packit 4e8bc4
struct _crawler_module_t {
Packit 4e8bc4
    void *data; /* opaque data pointer */
Packit 4e8bc4
    crawler_client_t c;
Packit 4e8bc4
    crawler_module_reg_t *mod;
Packit 4e8bc4
};
Packit 4e8bc4
Packit 4e8bc4
static int crawler_expired_init(crawler_module_t *cm, void *data);
Packit 4e8bc4
static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls);
Packit 4e8bc4
static void crawler_expired_finalize(crawler_module_t *cm);
Packit 4e8bc4
static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
Packit 4e8bc4
Packit 4e8bc4
crawler_module_reg_t crawler_expired_mod = {
Packit 4e8bc4
    .init = crawler_expired_init,
Packit 4e8bc4
    .eval = crawler_expired_eval,
Packit 4e8bc4
    .doneclass = crawler_expired_doneclass,
Packit 4e8bc4
    .finalize = crawler_expired_finalize,
Packit 4e8bc4
    .needs_lock = true,
Packit 4e8bc4
    .needs_client = false
Packit 4e8bc4
};
Packit 4e8bc4
Packit 4e8bc4
static void crawler_metadump_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
Packit 4e8bc4
static void crawler_metadump_finalize(crawler_module_t *cm);
Packit 4e8bc4
Packit 4e8bc4
crawler_module_reg_t crawler_metadump_mod = {
Packit 4e8bc4
    .init = NULL,
Packit 4e8bc4
    .eval = crawler_metadump_eval,
Packit 4e8bc4
    .doneclass = NULL,
Packit 4e8bc4
    .finalize = crawler_metadump_finalize,
Packit 4e8bc4
    .needs_lock = false,
Packit 4e8bc4
    .needs_client = true
Packit 4e8bc4
};
Packit 4e8bc4
Packit 4e8bc4
crawler_module_reg_t *crawler_mod_regs[3] = {
Packit 4e8bc4
    &crawler_expired_mod,
Packit 4e8bc4
    &crawler_expired_mod,
Packit 4e8bc4
    &crawler_metadump_mod
Packit 4e8bc4
};
Packit 4e8bc4
Packit 4e8bc4
static int lru_crawler_client_getbuf(crawler_client_t *c);
Packit 4e8bc4
crawler_module_t active_crawler_mod;
Packit 4e8bc4
enum crawler_run_type active_crawler_type;
Packit 4e8bc4
Packit 4e8bc4
static crawler crawlers[LARGEST_ID];
Packit 4e8bc4
Packit 4e8bc4
static int crawler_count = 0;
Packit 4e8bc4
static volatile int do_run_lru_crawler_thread = 0;
Packit 4e8bc4
static int lru_crawler_initialized = 0;
Packit 4e8bc4
static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
Packit 4e8bc4
static pthread_cond_t  lru_crawler_cond = PTHREAD_COND_INITIALIZER;
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
/* TODO: pass this around */
Packit 4e8bc4
static void *storage;
Packit 4e8bc4
#endif
Packit 4e8bc4
Packit 4e8bc4
/* Will crawl all slab classes a minimum of once per hour */
Packit 4e8bc4
#define MAX_MAINTCRAWL_WAIT 60 * 60
Packit 4e8bc4
Packit 4e8bc4
/*** LRU CRAWLER THREAD ***/
Packit 4e8bc4
Packit 4e8bc4
#define LRU_CRAWLER_WRITEBUF 8192
Packit 4e8bc4
Packit 4e8bc4
static void lru_crawler_close_client(crawler_client_t *c) {
Packit 4e8bc4
    //fprintf(stderr, "CRAWLER: Closing client\n");
Packit 4e8bc4
    sidethread_conn_close(c->c);
Packit 4e8bc4
    c->c = NULL;
Packit 4e8bc4
    c->cbuf = NULL;
Packit 4e8bc4
    bipbuf_free(c->buf);
Packit 4e8bc4
    c->buf = NULL;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static void lru_crawler_release_client(crawler_client_t *c) {
Packit 4e8bc4
    //fprintf(stderr, "CRAWLER: Closing client\n");
Packit 4e8bc4
    redispatch_conn(c->c);
Packit 4e8bc4
    c->c = NULL;
Packit 4e8bc4
    c->cbuf = NULL;
Packit 4e8bc4
    bipbuf_free(c->buf);
Packit 4e8bc4
    c->buf = NULL;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static int crawler_expired_init(crawler_module_t *cm, void *data) {
Packit 4e8bc4
    struct crawler_expired_data *d;
Packit 4e8bc4
    if (data != NULL) {
Packit 4e8bc4
        d = data;
Packit 4e8bc4
        d->is_external = true;
Packit 4e8bc4
        cm->data = data;
Packit 4e8bc4
    } else {
Packit 4e8bc4
        // allocate data.
Packit 4e8bc4
        d = calloc(1, sizeof(struct crawler_expired_data));
Packit 4e8bc4
        if (d == NULL) {
Packit 4e8bc4
            return -1;
Packit 4e8bc4
        }
Packit 4e8bc4
        // init lock.
Packit 4e8bc4
        pthread_mutex_init(&d->lock, NULL);
Packit 4e8bc4
        d->is_external = false;
Packit 4e8bc4
        d->start_time = current_time;
Packit 4e8bc4
Packit 4e8bc4
        cm->data = d;
Packit 4e8bc4
    }
Packit 4e8bc4
    pthread_mutex_lock(&d->lock);
Packit 4e8bc4
    memset(&d->crawlerstats, 0, sizeof(crawlerstats_t) * POWER_LARGEST);
Packit 4e8bc4
    for (int x = 0; x < POWER_LARGEST; x++) {
Packit 4e8bc4
        d->crawlerstats[x].start_time = current_time;
Packit 4e8bc4
        d->crawlerstats[x].run_complete = false;
Packit 4e8bc4
    }
Packit 4e8bc4
    pthread_mutex_unlock(&d->lock);
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls) {
Packit 4e8bc4
    struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
Packit 4e8bc4
    pthread_mutex_lock(&d->lock);
Packit 4e8bc4
    d->crawlerstats[slab_cls].end_time = current_time;
Packit 4e8bc4
    d->crawlerstats[slab_cls].run_complete = true;
Packit 4e8bc4
    pthread_mutex_unlock(&d->lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static void crawler_expired_finalize(crawler_module_t *cm) {
Packit 4e8bc4
    struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
Packit 4e8bc4
    pthread_mutex_lock(&d->lock);
Packit 4e8bc4
    d->end_time = current_time;
Packit 4e8bc4
    d->crawl_complete = true;
Packit 4e8bc4
    pthread_mutex_unlock(&d->lock);
Packit 4e8bc4
Packit 4e8bc4
    if (!d->is_external) {
Packit 4e8bc4
        free(d);
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* I pulled this out to make the main thread clearer, but it reaches into the
Packit 4e8bc4
 * main thread's values too much. Should rethink again.
Packit 4e8bc4
 */
Packit 4e8bc4
static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i) {
Packit 4e8bc4
    struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
Packit 4e8bc4
    pthread_mutex_lock(&d->lock);
Packit 4e8bc4
    crawlerstats_t *s = &d->crawlerstats[i];
Packit 4e8bc4
    int is_flushed = item_is_flushed(search);
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
    bool is_valid = true;
Packit 4e8bc4
    if (search->it_flags & ITEM_HDR) {
Packit 4e8bc4
        item_hdr *hdr = (item_hdr *)ITEM_data(search);
Packit 4e8bc4
        if (extstore_check(storage, hdr->page_id, hdr->page_version) != 0)
Packit 4e8bc4
            is_valid = false;
Packit 4e8bc4
    }
Packit 4e8bc4
#endif
Packit 4e8bc4
    if ((search->exptime != 0 && search->exptime < current_time)
Packit 4e8bc4
        || is_flushed
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
        || !is_valid
Packit 4e8bc4
#endif
Packit 4e8bc4
        ) {
Packit 4e8bc4
        crawlers[i].reclaimed++;
Packit 4e8bc4
        s->reclaimed++;
Packit 4e8bc4
Packit 4e8bc4
        if (settings.verbose > 1) {
Packit 4e8bc4
            int ii;
Packit 4e8bc4
            char *key = ITEM_key(search);
Packit 4e8bc4
            fprintf(stderr, "LRU crawler found an expired item (flags: %d, slab: %d): ",
Packit 4e8bc4
                search->it_flags, search->slabs_clsid);
Packit 4e8bc4
            for (ii = 0; ii < search->nkey; ++ii) {
Packit 4e8bc4
                fprintf(stderr, "%c", key[ii]);
Packit 4e8bc4
            }
Packit 4e8bc4
            fprintf(stderr, "\n");
Packit 4e8bc4
        }
Packit 4e8bc4
        if ((search->it_flags & ITEM_FETCHED) == 0 && !is_flushed) {
Packit 4e8bc4
            crawlers[i].unfetched++;
Packit 4e8bc4
        }
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
        STORAGE_delete(storage, search);
Packit 4e8bc4
#endif
Packit 4e8bc4
        do_item_unlink_nolock(search, hv);
Packit 4e8bc4
        do_item_remove(search);
Packit 4e8bc4
    } else {
Packit 4e8bc4
        s->seen++;
Packit 4e8bc4
        refcount_decr(search);
Packit 4e8bc4
        if (search->exptime == 0) {
Packit 4e8bc4
            s->noexp++;
Packit 4e8bc4
        } else if (search->exptime - current_time > 3599) {
Packit 4e8bc4
            s->ttl_hourplus++;
Packit 4e8bc4
        } else {
Packit 4e8bc4
            rel_time_t ttl_remain = search->exptime - current_time;
Packit 4e8bc4
            int bucket = ttl_remain / 60;
Packit 4e8bc4
            if (bucket <= 60) {
Packit 4e8bc4
                s->histo[bucket]++;
Packit 4e8bc4
            }
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
    pthread_mutex_unlock(&d->lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) {
Packit 4e8bc4
    //int slab_id = CLEAR_LRU(i);
Packit 4e8bc4
    char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
Packit 4e8bc4
    int is_flushed = item_is_flushed(it);
Packit 4e8bc4
    /* Ignore expired content. */
Packit 4e8bc4
    if ((it->exptime != 0 && it->exptime < current_time)
Packit 4e8bc4
        || is_flushed) {
Packit 4e8bc4
        refcount_decr(it);
Packit 4e8bc4
        return;
Packit 4e8bc4
    }
Packit 4e8bc4
    // TODO: uriencode directly into the buffer.
Packit 4e8bc4
    uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_URI_ENCODED_LENGTH);
Packit 4e8bc4
    int total = snprintf(cm->c.cbuf, 4096,
Packit 4e8bc4
            "key=%s exp=%ld la=%llu cas=%llu fetch=%s cls=%u size=%lu\n",
Packit 4e8bc4
            keybuf,
Packit 4e8bc4
            (it->exptime == 0) ? -1 : (long)(it->exptime + process_started),
Packit 4e8bc4
            (unsigned long long)(it->time + process_started),
Packit 4e8bc4
            (unsigned long long)ITEM_get_cas(it),
Packit 4e8bc4
            (it->it_flags & ITEM_FETCHED) ? "yes" : "no",
Packit 4e8bc4
            ITEM_clsid(it),
Packit 4e8bc4
            (unsigned long) ITEM_ntotal(it));
Packit 4e8bc4
    refcount_decr(it);
Packit 4e8bc4
    // TODO: some way of tracking the errors. these are very unlikely though.
Packit 4e8bc4
    if (total >= LRU_CRAWLER_WRITEBUF - 1 || total <= 0) {
Packit 4e8bc4
        /* Failed to write, don't push it. */
Packit 4e8bc4
        return;
Packit 4e8bc4
    }
Packit 4e8bc4
    bipbuf_push(cm->c.buf, total);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static void crawler_metadump_finalize(crawler_module_t *cm) {
Packit 4e8bc4
    if (cm->c.c != NULL) {
Packit 4e8bc4
        // Ensure space for final message.
Packit 4e8bc4
        lru_crawler_client_getbuf(&cm->c);
Packit 4e8bc4
        memcpy(cm->c.cbuf, "END\r\n", 5);
Packit 4e8bc4
        bipbuf_push(cm->c.buf, 5);
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static int lru_crawler_poll(crawler_client_t *c) {
Packit 4e8bc4
    unsigned char *data;
Packit 4e8bc4
    unsigned int data_size = 0;
Packit 4e8bc4
    struct pollfd to_poll[1];
Packit 4e8bc4
    to_poll[0].fd = c->sfd;
Packit 4e8bc4
    to_poll[0].events = POLLOUT;
Packit 4e8bc4
Packit 4e8bc4
    int ret = poll(to_poll, 1, 1000);
Packit 4e8bc4
Packit 4e8bc4
    if (ret < 0) {
Packit 4e8bc4
        // fatal.
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    if (ret == 0) return 0;
Packit 4e8bc4
Packit 4e8bc4
    if (to_poll[0].revents & POLLIN) {
Packit 4e8bc4
        char buf[1];
Packit 4e8bc4
        int res = ((conn*)c->c)->read(c->c, buf, 1);
Packit 4e8bc4
        if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
Packit 4e8bc4
            lru_crawler_close_client(c);
Packit 4e8bc4
            return -1;
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
    if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) {
Packit 4e8bc4
        if (to_poll[0].revents & (POLLHUP|POLLERR)) {
Packit 4e8bc4
            lru_crawler_close_client(c);
Packit 4e8bc4
            return -1;
Packit 4e8bc4
        } else if (to_poll[0].revents & POLLOUT) {
Packit 4e8bc4
            int total = ((conn*)c->c)->write(c->c, data, data_size);
Packit 4e8bc4
            if (total == -1) {
Packit 4e8bc4
                if (errno != EAGAIN && errno != EWOULDBLOCK) {
Packit 4e8bc4
                    lru_crawler_close_client(c);
Packit 4e8bc4
                    return -1;
Packit 4e8bc4
                }
Packit 4e8bc4
            } else if (total == 0) {
Packit 4e8bc4
                lru_crawler_close_client(c);
Packit 4e8bc4
                return -1;
Packit 4e8bc4
            } else {
Packit 4e8bc4
                bipbuf_poll(c->buf, total);
Packit 4e8bc4
            }
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* Grab some space to work with, if none exists, run the poll() loop and wait
Packit 4e8bc4
 * for it to clear up or close.
Packit 4e8bc4
 * Return NULL if closed.
Packit 4e8bc4
 */
Packit 4e8bc4
static int lru_crawler_client_getbuf(crawler_client_t *c) {
Packit 4e8bc4
    void *buf = NULL;
Packit 4e8bc4
    if (c->c == NULL) return -1;
Packit 4e8bc4
    /* not enough space. */
Packit 4e8bc4
    while ((buf = bipbuf_request(c->buf, LRU_CRAWLER_WRITEBUF)) == NULL) {
Packit 4e8bc4
        // TODO: max loops before closing.
Packit 4e8bc4
        int ret = lru_crawler_poll(c);
Packit 4e8bc4
        if (ret < 0) return ret;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    c->cbuf = buf;
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static void lru_crawler_class_done(int i) {
Packit 4e8bc4
    crawlers[i].it_flags = 0;
Packit 4e8bc4
    crawler_count--;
Packit 4e8bc4
    do_item_unlinktail_q((item *)&crawlers[i]);
Packit 4e8bc4
    do_item_stats_add_crawl(i, crawlers[i].reclaimed,
Packit 4e8bc4
            crawlers[i].unfetched, crawlers[i].checked);
Packit 4e8bc4
    pthread_mutex_unlock(&lru_locks[i]);
Packit 4e8bc4
    if (active_crawler_mod.mod->doneclass != NULL)
Packit 4e8bc4
        active_crawler_mod.mod->doneclass(&active_crawler_mod, i);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static void *item_crawler_thread(void *arg) {
Packit 4e8bc4
    int i;
Packit 4e8bc4
    int crawls_persleep = settings.crawls_persleep;
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_lock(&lru_crawler_lock);
Packit 4e8bc4
    pthread_cond_signal(&lru_crawler_cond);
Packit 4e8bc4
    settings.lru_crawler = true;
Packit 4e8bc4
    if (settings.verbose > 2)
Packit 4e8bc4
        fprintf(stderr, "Starting LRU crawler background thread\n");
Packit 4e8bc4
    while (do_run_lru_crawler_thread) {
Packit 4e8bc4
    pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
Packit 4e8bc4
Packit 4e8bc4
    while (crawler_count) {
Packit 4e8bc4
        item *search = NULL;
Packit 4e8bc4
        void *hold_lock = NULL;
Packit 4e8bc4
Packit 4e8bc4
        for (i = POWER_SMALLEST; i < LARGEST_ID; i++) {
Packit 4e8bc4
            if (crawlers[i].it_flags != 1) {
Packit 4e8bc4
                continue;
Packit 4e8bc4
            }
Packit 4e8bc4
Packit 4e8bc4
            /* Get memory from bipbuf, if client has no space, flush. */
Packit 4e8bc4
            if (active_crawler_mod.c.c != NULL) {
Packit 4e8bc4
                int ret = lru_crawler_client_getbuf(&active_crawler_mod.c);
Packit 4e8bc4
                if (ret != 0) {
Packit 4e8bc4
                    lru_crawler_class_done(i);
Packit 4e8bc4
                    continue;
Packit 4e8bc4
                }
Packit 4e8bc4
            } else if (active_crawler_mod.mod->needs_client) {
Packit 4e8bc4
                lru_crawler_class_done(i);
Packit 4e8bc4
                continue;
Packit 4e8bc4
            }
Packit 4e8bc4
            pthread_mutex_lock(&lru_locks[i]);
Packit 4e8bc4
            search = do_item_crawl_q((item *)&crawlers[i]);
Packit 4e8bc4
            if (search == NULL ||
Packit 4e8bc4
                (crawlers[i].remaining && --crawlers[i].remaining < 1)) {
Packit 4e8bc4
                if (settings.verbose > 2)
Packit 4e8bc4
                    fprintf(stderr, "Nothing left to crawl for %d\n", i);
Packit 4e8bc4
                lru_crawler_class_done(i);
Packit 4e8bc4
                continue;
Packit 4e8bc4
            }
Packit 4e8bc4
            uint32_t hv = hash(ITEM_key(search), search->nkey);
Packit 4e8bc4
            /* Attempt to hash item lock the "search" item. If locked, no
Packit 4e8bc4
             * other callers can incr the refcount
Packit 4e8bc4
             */
Packit 4e8bc4
            if ((hold_lock = item_trylock(hv)) == NULL) {
Packit 4e8bc4
                pthread_mutex_unlock(&lru_locks[i]);
Packit 4e8bc4
                continue;
Packit 4e8bc4
            }
Packit 4e8bc4
            /* Now see if the item is refcount locked */
Packit 4e8bc4
            if (refcount_incr(search) != 2) {
Packit 4e8bc4
                refcount_decr(search);
Packit 4e8bc4
                if (hold_lock)
Packit 4e8bc4
                    item_trylock_unlock(hold_lock);
Packit 4e8bc4
                pthread_mutex_unlock(&lru_locks[i]);
Packit 4e8bc4
                continue;
Packit 4e8bc4
            }
Packit 4e8bc4
Packit 4e8bc4
            crawlers[i].checked++;
Packit 4e8bc4
            /* Frees the item or decrements the refcount. */
Packit 4e8bc4
            /* Interface for this could improve: do the free/decr here
Packit 4e8bc4
             * instead? */
Packit 4e8bc4
            if (!active_crawler_mod.mod->needs_lock) {
Packit 4e8bc4
                pthread_mutex_unlock(&lru_locks[i]);
Packit 4e8bc4
            }
Packit 4e8bc4
Packit 4e8bc4
            active_crawler_mod.mod->eval(&active_crawler_mod, search, hv, i);
Packit 4e8bc4
Packit 4e8bc4
            if (hold_lock)
Packit 4e8bc4
                item_trylock_unlock(hold_lock);
Packit 4e8bc4
            if (active_crawler_mod.mod->needs_lock) {
Packit 4e8bc4
                pthread_mutex_unlock(&lru_locks[i]);
Packit 4e8bc4
            }
Packit 4e8bc4
Packit 4e8bc4
            if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
Packit 4e8bc4
                pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
                usleep(settings.lru_crawler_sleep);
Packit 4e8bc4
                pthread_mutex_lock(&lru_crawler_lock);
Packit 4e8bc4
                crawls_persleep = settings.crawls_persleep;
Packit 4e8bc4
            } else if (!settings.lru_crawler_sleep) {
Packit 4e8bc4
                // TODO: only cycle lock every N?
Packit 4e8bc4
                pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
                pthread_mutex_lock(&lru_crawler_lock);
Packit 4e8bc4
            }
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    if (active_crawler_mod.mod != NULL) {
Packit 4e8bc4
        if (active_crawler_mod.mod->finalize != NULL)
Packit 4e8bc4
            active_crawler_mod.mod->finalize(&active_crawler_mod);
Packit 4e8bc4
        while (active_crawler_mod.c.c != NULL && bipbuf_used(active_crawler_mod.c.buf)) {
Packit 4e8bc4
            lru_crawler_poll(&active_crawler_mod.c);
Packit 4e8bc4
        }
Packit 4e8bc4
        // Double checking in case the client closed during the poll
Packit 4e8bc4
        if (active_crawler_mod.c.c != NULL) {
Packit 4e8bc4
            lru_crawler_release_client(&active_crawler_mod.c);
Packit 4e8bc4
        }
Packit 4e8bc4
        active_crawler_mod.mod = NULL;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    if (settings.verbose > 2)
Packit 4e8bc4
        fprintf(stderr, "LRU crawler thread sleeping\n");
Packit 4e8bc4
Packit 4e8bc4
    STATS_LOCK();
Packit 4e8bc4
    stats_state.lru_crawler_running = false;
Packit 4e8bc4
    STATS_UNLOCK();
Packit 4e8bc4
    }
Packit 4e8bc4
    pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
    if (settings.verbose > 2)
Packit 4e8bc4
        fprintf(stderr, "LRU crawler thread stopping\n");
Packit 4e8bc4
    settings.lru_crawler = false;
Packit 4e8bc4
Packit 4e8bc4
    return NULL;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static pthread_t item_crawler_tid;
Packit 4e8bc4
Packit 4e8bc4
int stop_item_crawler_thread(bool wait) {
Packit 4e8bc4
    int ret;
Packit 4e8bc4
    pthread_mutex_lock(&lru_crawler_lock);
Packit 4e8bc4
    if (do_run_lru_crawler_thread == 0) {
Packit 4e8bc4
        pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
        return 0;
Packit 4e8bc4
    }
Packit 4e8bc4
    do_run_lru_crawler_thread = 0;
Packit 4e8bc4
    pthread_cond_signal(&lru_crawler_cond);
Packit 4e8bc4
    pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
    if (wait && (ret = pthread_join(item_crawler_tid, NULL)) != 0) {
Packit 4e8bc4
        fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret));
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* Lock dance to "block" until thread is waiting on its condition:
Packit 4e8bc4
 * caller locks mtx. caller spawns thread.
Packit 4e8bc4
 * thread blocks on mutex.
Packit 4e8bc4
 * caller waits on condition, releases lock.
Packit 4e8bc4
 * thread gets lock, sends signal.
Packit 4e8bc4
 * caller can't wait, as thread has lock.
Packit 4e8bc4
 * thread waits on condition, releases lock
Packit 4e8bc4
 * caller wakes on condition, gets lock.
Packit 4e8bc4
 * caller immediately releases lock.
Packit 4e8bc4
 * thread is now safely waiting on condition before the caller returns.
Packit 4e8bc4
 */
Packit 4e8bc4
int start_item_crawler_thread(void) {
Packit 4e8bc4
    int ret;
Packit 4e8bc4
Packit 4e8bc4
    if (settings.lru_crawler)
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    pthread_mutex_lock(&lru_crawler_lock);
Packit 4e8bc4
    do_run_lru_crawler_thread = 1;
Packit 4e8bc4
    if ((ret = pthread_create(&item_crawler_tid, NULL,
Packit 4e8bc4
        item_crawler_thread, NULL)) != 0) {
Packit 4e8bc4
        fprintf(stderr, "Can't create LRU crawler thread: %s\n",
Packit 4e8bc4
            strerror(ret));
Packit 4e8bc4
        pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
    /* Avoid returning until the crawler has actually started */
Packit 4e8bc4
    pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
Packit 4e8bc4
    pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
Packit 4e8bc4
 * LRU every time.
Packit 4e8bc4
 */
Packit 4e8bc4
static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
Packit 4e8bc4
    uint32_t sid = id;
Packit 4e8bc4
    int starts = 0;
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_lock(&lru_locks[sid]);
Packit 4e8bc4
    if (crawlers[sid].it_flags == 0) {
Packit 4e8bc4
        if (settings.verbose > 2)
Packit 4e8bc4
            fprintf(stderr, "Kicking LRU crawler off for LRU %u\n", sid);
Packit 4e8bc4
        crawlers[sid].nbytes = 0;
Packit 4e8bc4
        crawlers[sid].nkey = 0;
Packit 4e8bc4
        crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */
Packit 4e8bc4
        crawlers[sid].next = 0;
Packit 4e8bc4
        crawlers[sid].prev = 0;
Packit 4e8bc4
        crawlers[sid].time = 0;
Packit 4e8bc4
        if (remaining == LRU_CRAWLER_CAP_REMAINING) {
Packit 4e8bc4
            remaining = do_get_lru_size(sid);
Packit 4e8bc4
        }
Packit 4e8bc4
        /* Values for remaining:
Packit 4e8bc4
         * remaining = 0
Packit 4e8bc4
         * - scan all elements, until a NULL is reached
Packit 4e8bc4
         * - if empty, NULL is reached right away
Packit 4e8bc4
         * remaining = n + 1
Packit 4e8bc4
         * - first n elements are parsed (or until a NULL is reached)
Packit 4e8bc4
         */
Packit 4e8bc4
        if (remaining) remaining++;
Packit 4e8bc4
        crawlers[sid].remaining = remaining;
Packit 4e8bc4
        crawlers[sid].slabs_clsid = sid;
Packit 4e8bc4
        crawlers[sid].reclaimed = 0;
Packit 4e8bc4
        crawlers[sid].unfetched = 0;
Packit 4e8bc4
        crawlers[sid].checked = 0;
Packit 4e8bc4
        do_item_linktail_q((item *)&crawlers[sid]);
Packit 4e8bc4
        crawler_count++;
Packit 4e8bc4
        starts++;
Packit 4e8bc4
    }
Packit 4e8bc4
    pthread_mutex_unlock(&lru_locks[sid]);
Packit 4e8bc4
    if (starts) {
Packit 4e8bc4
        STATS_LOCK();
Packit 4e8bc4
        stats_state.lru_crawler_running = true;
Packit 4e8bc4
        stats.lru_crawler_starts++;
Packit 4e8bc4
        STATS_UNLOCK();
Packit 4e8bc4
    }
Packit 4e8bc4
    return starts;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) {
Packit 4e8bc4
    crawler_client_t *crawlc = &cm->c;
Packit 4e8bc4
    if (crawlc->c != NULL) {
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
    crawlc->c = c;
Packit 4e8bc4
    crawlc->sfd = sfd;
Packit 4e8bc4
Packit 4e8bc4
    crawlc->buf = bipbuf_new(1024 * 128);
Packit 4e8bc4
    if (crawlc->buf == NULL) {
Packit 4e8bc4
        return -2;
Packit 4e8bc4
    }
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
int lru_crawler_start(uint8_t *ids, uint32_t remaining,
Packit 4e8bc4
                             const enum crawler_run_type type, void *data,
Packit 4e8bc4
                             void *c, const int sfd) {
Packit 4e8bc4
    int starts = 0;
Packit 4e8bc4
    bool is_running;
Packit 4e8bc4
    static rel_time_t block_ae_until = 0;
Packit 4e8bc4
    pthread_mutex_lock(&lru_crawler_lock);
Packit 4e8bc4
    STATS_LOCK();
Packit 4e8bc4
    is_running = stats_state.lru_crawler_running;
Packit 4e8bc4
    STATS_UNLOCK();
Packit 4e8bc4
    if (do_run_lru_crawler_thread == 0) {
Packit 4e8bc4
        pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
        return -2;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    if (is_running &&
Packit 4e8bc4
            !(type == CRAWLER_AUTOEXPIRE && active_crawler_type == CRAWLER_AUTOEXPIRE)) {
Packit 4e8bc4
        pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
        block_ae_until = current_time + 60;
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    if (type == CRAWLER_AUTOEXPIRE && block_ae_until > current_time) {
Packit 4e8bc4
        pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    /* Configure the module */
Packit 4e8bc4
    if (!is_running) {
Packit 4e8bc4
        assert(crawler_mod_regs[type] != NULL);
Packit 4e8bc4
        active_crawler_mod.mod = crawler_mod_regs[type];
Packit 4e8bc4
        active_crawler_type = type;
Packit 4e8bc4
        if (active_crawler_mod.mod->init != NULL) {
Packit 4e8bc4
            active_crawler_mod.mod->init(&active_crawler_mod, data);
Packit 4e8bc4
        }
Packit 4e8bc4
        if (active_crawler_mod.mod->needs_client) {
Packit 4e8bc4
            if (c == NULL || sfd == 0) {
Packit 4e8bc4
                pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
                return -2;
Packit 4e8bc4
            }
Packit 4e8bc4
            if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) {
Packit 4e8bc4
                pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
                return -2;
Packit 4e8bc4
            }
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    /* we allow the autocrawler to restart sub-LRU's before completion */
Packit 4e8bc4
    for (int sid = POWER_SMALLEST; sid < POWER_LARGEST; sid++) {
Packit 4e8bc4
        if (ids[sid])
Packit 4e8bc4
            starts += do_lru_crawler_start(sid, remaining);
Packit 4e8bc4
    }
Packit 4e8bc4
    if (starts) {
Packit 4e8bc4
        pthread_cond_signal(&lru_crawler_cond);
Packit 4e8bc4
    }
Packit 4e8bc4
    pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
    return starts;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Also only clear the crawlerstats once per sid.
Packit 4e8bc4
 */
Packit 4e8bc4
enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type,
Packit 4e8bc4
        void *c, const int sfd, unsigned int remaining) {
Packit 4e8bc4
    char *b = NULL;
Packit 4e8bc4
    uint32_t sid = 0;
Packit 4e8bc4
    int starts = 0;
Packit 4e8bc4
    uint8_t tocrawl[POWER_LARGEST];
Packit 4e8bc4
Packit 4e8bc4
    /* FIXME: I added this while debugging. Don't think it's needed? */
Packit 4e8bc4
    memset(tocrawl, 0, sizeof(uint8_t) * POWER_LARGEST);
Packit 4e8bc4
    if (strcmp(slabs, "all") == 0) {
Packit 4e8bc4
        for (sid = 0; sid < POWER_LARGEST; sid++) {
Packit 4e8bc4
            tocrawl[sid] = 1;
Packit 4e8bc4
        }
Packit 4e8bc4
    } else {
Packit 4e8bc4
        for (char *p = strtok_r(slabs, ",", &b);
Packit 4e8bc4
             p != NULL;
Packit 4e8bc4
             p = strtok_r(NULL, ",", &b)) {
Packit 4e8bc4
Packit 4e8bc4
            if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST
Packit 4e8bc4
                    || sid >= MAX_NUMBER_OF_SLAB_CLASSES) {
Packit 4e8bc4
                pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
                return CRAWLER_BADCLASS;
Packit 4e8bc4
            }
Packit 4e8bc4
            tocrawl[sid | TEMP_LRU] = 1;
Packit 4e8bc4
            tocrawl[sid | HOT_LRU] = 1;
Packit 4e8bc4
            tocrawl[sid | WARM_LRU] = 1;
Packit 4e8bc4
            tocrawl[sid | COLD_LRU] = 1;
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    starts = lru_crawler_start(tocrawl, remaining, type, NULL, c, sfd);
Packit 4e8bc4
    if (starts == -1) {
Packit 4e8bc4
        return CRAWLER_RUNNING;
Packit 4e8bc4
    } else if (starts == -2) {
Packit 4e8bc4
        return CRAWLER_ERROR; /* FIXME: not very helpful. */
Packit 4e8bc4
    } else if (starts) {
Packit 4e8bc4
        return CRAWLER_OK;
Packit 4e8bc4
    } else {
Packit 4e8bc4
        return CRAWLER_NOTSTARTED;
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* If we hold this lock, crawler can't wake up or move */
Packit 4e8bc4
void lru_crawler_pause(void) {
Packit 4e8bc4
    pthread_mutex_lock(&lru_crawler_lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void lru_crawler_resume(void) {
Packit 4e8bc4
    pthread_mutex_unlock(&lru_crawler_lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
int init_lru_crawler(void *arg) {
Packit 4e8bc4
    if (lru_crawler_initialized == 0) {
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
        storage = arg;
Packit 4e8bc4
#endif
Packit 4e8bc4
        active_crawler_mod.c.c = NULL;
Packit 4e8bc4
        active_crawler_mod.mod = NULL;
Packit 4e8bc4
        active_crawler_mod.data = NULL;
Packit 4e8bc4
        lru_crawler_initialized = 1;
Packit 4e8bc4
    }
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}