Blame assoc.c

Packit 4e8bc4
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
Packit 4e8bc4
/*
Packit 4e8bc4
 * Hash table
Packit 4e8bc4
 *
Packit 4e8bc4
 * The hash function used here is by Bob Jenkins, 1996:
Packit 4e8bc4
 *    <http://burtleburtle.net/bob/hash/doobs.html>
Packit 4e8bc4
 *       "By Bob Jenkins, 1996.  bob_jenkins@burtleburtle.net.
Packit 4e8bc4
 *       You may use this code any way you wish, private, educational,
Packit 4e8bc4
 *       or commercial.  It's free."
Packit 4e8bc4
 *
Packit 4e8bc4
 * The rest of the file is licensed under the BSD license.  See LICENSE.
Packit 4e8bc4
 */
Packit 4e8bc4
Packit 4e8bc4
#include "memcached.h"
Packit 4e8bc4
#include <sys/stat.h>
Packit 4e8bc4
#include <sys/socket.h>
Packit 4e8bc4
#include <sys/resource.h>
Packit 4e8bc4
#include <signal.h>
Packit 4e8bc4
#include <fcntl.h>
Packit 4e8bc4
#include <netinet/in.h>
Packit 4e8bc4
#include <errno.h>
Packit 4e8bc4
#include <stdlib.h>
Packit 4e8bc4
#include <stdio.h>
Packit 4e8bc4
#include <string.h>
Packit 4e8bc4
#include <assert.h>
Packit 4e8bc4
#include <pthread.h>
Packit 4e8bc4
Packit 4e8bc4
static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
Packit 4e8bc4
static pthread_mutex_t maintenance_lock = PTHREAD_MUTEX_INITIALIZER;
Packit 4e8bc4
Packit 4e8bc4
typedef  uint32_t  ub4;   /* unsigned 4-byte quantities */
Packit 4e8bc4
typedef  unsigned       char ub1;   /* unsigned 1-byte quantities */
Packit 4e8bc4
Packit 4e8bc4
/* how many powers of 2's worth of buckets we use */
Packit 4e8bc4
unsigned int hashpower = HASHPOWER_DEFAULT;
Packit 4e8bc4
Packit 4e8bc4
#define hashsize(n) ((ub4)1<<(n))
Packit 4e8bc4
#define hashmask(n) (hashsize(n)-1)
Packit 4e8bc4
Packit 4e8bc4
/* Main hash table. This is where we look except during expansion. */
Packit 4e8bc4
static item** primary_hashtable = 0;
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Previous hash table. During expansion, we look here for keys that haven't
Packit 4e8bc4
 * been moved over to the primary yet.
Packit 4e8bc4
 */
Packit 4e8bc4
static item** old_hashtable = 0;
Packit 4e8bc4
Packit 4e8bc4
/* Flag: Are we in the middle of expanding now? */
Packit 4e8bc4
static bool expanding = false;
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * During expansion we migrate values with bucket granularity; this is how
Packit 4e8bc4
 * far we've gotten so far. Ranges from 0 .. hashsize(hashpower - 1) - 1.
Packit 4e8bc4
 */
Packit 4e8bc4
static unsigned int expand_bucket = 0;
Packit 4e8bc4
Packit 4e8bc4
void assoc_init(const int hashtable_init) {
Packit 4e8bc4
    if (hashtable_init) {
Packit 4e8bc4
        hashpower = hashtable_init;
Packit 4e8bc4
    }
Packit 4e8bc4
    primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
Packit 4e8bc4
    if (! primary_hashtable) {
Packit 4e8bc4
        fprintf(stderr, "Failed to init hashtable.\n");
Packit 4e8bc4
        exit(EXIT_FAILURE);
Packit 4e8bc4
    }
Packit 4e8bc4
    STATS_LOCK();
Packit 4e8bc4
    stats_state.hash_power_level = hashpower;
Packit 4e8bc4
    stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
Packit 4e8bc4
    STATS_UNLOCK();
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
Packit 4e8bc4
    item *it;
Packit 4e8bc4
    unsigned int oldbucket;
Packit 4e8bc4
Packit 4e8bc4
    if (expanding &&
Packit 4e8bc4
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
Packit 4e8bc4
    {
Packit 4e8bc4
        it = old_hashtable[oldbucket];
Packit 4e8bc4
    } else {
Packit 4e8bc4
        it = primary_hashtable[hv & hashmask(hashpower)];
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    item *ret = NULL;
Packit 4e8bc4
    int depth = 0;
Packit 4e8bc4
    while (it) {
Packit 4e8bc4
        if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
Packit 4e8bc4
            ret = it;
Packit 4e8bc4
            break;
Packit 4e8bc4
        }
Packit 4e8bc4
        it = it->h_next;
Packit 4e8bc4
        ++depth;
Packit 4e8bc4
    }
Packit 4e8bc4
    MEMCACHED_ASSOC_FIND(key, nkey, depth);
Packit 4e8bc4
    return ret;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* returns the address of the item pointer before the key.  if *item == 0,
Packit 4e8bc4
   the item wasn't found */
Packit 4e8bc4
Packit 4e8bc4
static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
Packit 4e8bc4
    item **pos;
Packit 4e8bc4
    unsigned int oldbucket;
Packit 4e8bc4
Packit 4e8bc4
    if (expanding &&
Packit 4e8bc4
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
Packit 4e8bc4
    {
Packit 4e8bc4
        pos = &old_hashtable[oldbucket];
Packit 4e8bc4
    } else {
Packit 4e8bc4
        pos = &primary_hashtable[hv & hashmask(hashpower)];
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
Packit 4e8bc4
        pos = &(*pos)->h_next;
Packit 4e8bc4
    }
Packit 4e8bc4
    return pos;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* grows the hashtable to the next power of 2. */
Packit 4e8bc4
static void assoc_expand(void) {
Packit 4e8bc4
    old_hashtable = primary_hashtable;
Packit 4e8bc4
Packit 4e8bc4
    primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
Packit 4e8bc4
    if (primary_hashtable) {
Packit 4e8bc4
        if (settings.verbose > 1)
Packit 4e8bc4
            fprintf(stderr, "Hash table expansion starting\n");
Packit 4e8bc4
        hashpower++;
Packit 4e8bc4
        expanding = true;
Packit 4e8bc4
        expand_bucket = 0;
Packit 4e8bc4
        STATS_LOCK();
Packit 4e8bc4
        stats_state.hash_power_level = hashpower;
Packit 4e8bc4
        stats_state.hash_bytes += hashsize(hashpower) * sizeof(void *);
Packit 4e8bc4
        stats_state.hash_is_expanding = true;
Packit 4e8bc4
        STATS_UNLOCK();
Packit 4e8bc4
    } else {
Packit 4e8bc4
        primary_hashtable = old_hashtable;
Packit 4e8bc4
        /* Bad news, but we can keep running. */
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void assoc_start_expand(uint64_t curr_items) {
Packit 4e8bc4
    if (pthread_mutex_trylock(&maintenance_lock) == 0) {
Packit 4e8bc4
        if (curr_items > (hashsize(hashpower) * 3) / 2 && hashpower < HASHPOWER_MAX) {
Packit 4e8bc4
            pthread_cond_signal(&maintenance_cond);
Packit 4e8bc4
        }
Packit 4e8bc4
        pthread_mutex_unlock(&maintenance_lock);
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* Note: this isn't an assoc_update.  The key must not already exist to call this */
Packit 4e8bc4
int assoc_insert(item *it, const uint32_t hv) {
Packit 4e8bc4
    unsigned int oldbucket;
Packit 4e8bc4
Packit 4e8bc4
//    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */
Packit 4e8bc4
Packit 4e8bc4
    if (expanding &&
Packit 4e8bc4
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
Packit 4e8bc4
    {
Packit 4e8bc4
        it->h_next = old_hashtable[oldbucket];
Packit 4e8bc4
        old_hashtable[oldbucket] = it;
Packit 4e8bc4
    } else {
Packit 4e8bc4
        it->h_next = primary_hashtable[hv & hashmask(hashpower)];
Packit 4e8bc4
        primary_hashtable[hv & hashmask(hashpower)] = it;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);
Packit 4e8bc4
    return 1;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
Packit 4e8bc4
    item **before = _hashitem_before(key, nkey, hv);
Packit 4e8bc4
Packit 4e8bc4
    if (*before) {
Packit 4e8bc4
        item *nxt;
Packit 4e8bc4
        /* The DTrace probe cannot be triggered as the last instruction
Packit 4e8bc4
         * due to possible tail-optimization by the compiler
Packit 4e8bc4
         */
Packit 4e8bc4
        MEMCACHED_ASSOC_DELETE(key, nkey);
Packit 4e8bc4
        nxt = (*before)->h_next;
Packit 4e8bc4
        (*before)->h_next = 0;   /* probably pointless, but whatever. */
Packit 4e8bc4
        *before = nxt;
Packit 4e8bc4
        return;
Packit 4e8bc4
    }
Packit 4e8bc4
    /* Note:  we never actually get here.  the callers don't delete things
Packit 4e8bc4
       they can't find. */
Packit 4e8bc4
    assert(*before != 0);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
Packit 4e8bc4
static volatile int do_run_maintenance_thread = 1;
Packit 4e8bc4
Packit 4e8bc4
#define DEFAULT_HASH_BULK_MOVE 1
Packit 4e8bc4
int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
Packit 4e8bc4
Packit 4e8bc4
static void *assoc_maintenance_thread(void *arg) {
Packit 4e8bc4
Packit 4e8bc4
    mutex_lock(&maintenance_lock);
Packit 4e8bc4
    while (do_run_maintenance_thread) {
Packit 4e8bc4
        int ii = 0;
Packit 4e8bc4
Packit 4e8bc4
        /* There is only one expansion thread, so no need to global lock. */
Packit 4e8bc4
        for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
Packit 4e8bc4
            item *it, *next;
Packit 4e8bc4
            unsigned int bucket;
Packit 4e8bc4
            void *item_lock = NULL;
Packit 4e8bc4
Packit 4e8bc4
            /* bucket = hv & hashmask(hashpower) =>the bucket of hash table
Packit 4e8bc4
             * is the lowest N bits of the hv, and the bucket of item_locks is
Packit 4e8bc4
             *  also the lowest M bits of hv, and N is greater than M.
Packit 4e8bc4
             *  So we can process expanding with only one item_lock. cool! */
Packit 4e8bc4
            if ((item_lock = item_trylock(expand_bucket))) {
Packit 4e8bc4
                    for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
Packit 4e8bc4
                        next = it->h_next;
Packit 4e8bc4
                        bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
Packit 4e8bc4
                        it->h_next = primary_hashtable[bucket];
Packit 4e8bc4
                        primary_hashtable[bucket] = it;
Packit 4e8bc4
                    }
Packit 4e8bc4
Packit 4e8bc4
                    old_hashtable[expand_bucket] = NULL;
Packit 4e8bc4
Packit 4e8bc4
                    expand_bucket++;
Packit 4e8bc4
                    if (expand_bucket == hashsize(hashpower - 1)) {
Packit 4e8bc4
                        expanding = false;
Packit 4e8bc4
                        free(old_hashtable);
Packit 4e8bc4
                        STATS_LOCK();
Packit 4e8bc4
                        stats_state.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
Packit 4e8bc4
                        stats_state.hash_is_expanding = false;
Packit 4e8bc4
                        STATS_UNLOCK();
Packit 4e8bc4
                        if (settings.verbose > 1)
Packit 4e8bc4
                            fprintf(stderr, "Hash table expansion done\n");
Packit 4e8bc4
                    }
Packit 4e8bc4
Packit 4e8bc4
            } else {
Packit 4e8bc4
                usleep(10*1000);
Packit 4e8bc4
            }
Packit 4e8bc4
Packit 4e8bc4
            if (item_lock) {
Packit 4e8bc4
                item_trylock_unlock(item_lock);
Packit 4e8bc4
                item_lock = NULL;
Packit 4e8bc4
            }
Packit 4e8bc4
        }
Packit 4e8bc4
Packit 4e8bc4
        if (!expanding) {
Packit 4e8bc4
            /* We are done expanding.. just wait for next invocation */
Packit 4e8bc4
            pthread_cond_wait(&maintenance_cond, &maintenance_lock);
Packit 4e8bc4
            /* assoc_expand() swaps out the hash table entirely, so we need
Packit 4e8bc4
             * all threads to not hold any references related to the hash
Packit 4e8bc4
             * table while this happens.
Packit 4e8bc4
             * This is instead of a more complex, possibly slower algorithm to
Packit 4e8bc4
             * allow dynamic hash table expansion without causing significant
Packit 4e8bc4
             * wait times.
Packit 4e8bc4
             */
Packit 4e8bc4
            if (do_run_maintenance_thread) {
Packit 4e8bc4
                pause_threads(PAUSE_ALL_THREADS);
Packit 4e8bc4
                assoc_expand();
Packit 4e8bc4
                pause_threads(RESUME_ALL_THREADS);
Packit 4e8bc4
            }
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
    mutex_unlock(&maintenance_lock);
Packit 4e8bc4
    return NULL;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static pthread_t maintenance_tid;
Packit 4e8bc4
Packit 4e8bc4
int start_assoc_maintenance_thread() {
Packit 4e8bc4
    int ret;
Packit 4e8bc4
    char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
Packit 4e8bc4
    if (env != NULL) {
Packit 4e8bc4
        hash_bulk_move = atoi(env);
Packit 4e8bc4
        if (hash_bulk_move == 0) {
Packit 4e8bc4
            hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    if ((ret = pthread_create(&maintenance_tid, NULL,
Packit 4e8bc4
                              assoc_maintenance_thread, NULL)) != 0) {
Packit 4e8bc4
        fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
Packit 4e8bc4
        return -1;
Packit 4e8bc4
    }
Packit 4e8bc4
    return 0;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void stop_assoc_maintenance_thread() {
Packit 4e8bc4
    mutex_lock(&maintenance_lock);
Packit 4e8bc4
    do_run_maintenance_thread = 0;
Packit 4e8bc4
    pthread_cond_signal(&maintenance_cond);
Packit 4e8bc4
    mutex_unlock(&maintenance_lock);
Packit 4e8bc4
Packit 4e8bc4
    /* Wait for the maintenance thread to stop */
Packit 4e8bc4
    pthread_join(maintenance_tid, NULL);
Packit 4e8bc4
}
Packit 4e8bc4