Blame thread.c

Packit 4e8bc4
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
Packit 4e8bc4
/*
Packit 4e8bc4
 * Thread management for memcached.
Packit 4e8bc4
 */
Packit 4e8bc4
#include "memcached.h"
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
#include "storage.h"
Packit 4e8bc4
#endif
Packit 4e8bc4
#include <assert.h>
Packit 4e8bc4
#include <stdio.h>
Packit 4e8bc4
#include <errno.h>
Packit 4e8bc4
#include <stdlib.h>
Packit 4e8bc4
#include <string.h>
Packit 4e8bc4
#include <pthread.h>
Packit 4e8bc4
Packit 4e8bc4
#ifdef __sun
Packit 4e8bc4
#include <atomic.h>
Packit 4e8bc4
#endif
Packit 4e8bc4
Packit 4e8bc4
#ifdef TLS
Packit 4e8bc4
#include <openssl/ssl.h>
Packit 4e8bc4
#endif
Packit 4e8bc4
Packit 4e8bc4
#define ITEMS_PER_ALLOC 64
Packit 4e8bc4
Packit 4e8bc4
/* An item in the connection queue. */
Packit 4e8bc4
enum conn_queue_item_modes {
Packit 4e8bc4
    queue_new_conn,   /* brand new connection. */
Packit 4e8bc4
    queue_redispatch, /* redispatching from side thread */
Packit 4e8bc4
};
Packit 4e8bc4
typedef struct conn_queue_item CQ_ITEM;
Packit 4e8bc4
struct conn_queue_item {
Packit 4e8bc4
    int               sfd;
Packit 4e8bc4
    enum conn_states  init_state;
Packit 4e8bc4
    int               event_flags;
Packit 4e8bc4
    int               read_buffer_size;
Packit 4e8bc4
    enum network_transport     transport;
Packit 4e8bc4
    enum conn_queue_item_modes mode;
Packit 4e8bc4
    conn *c;
Packit 4e8bc4
    void    *ssl;
Packit 4e8bc4
    CQ_ITEM          *next;
Packit 4e8bc4
};
Packit 4e8bc4
Packit 4e8bc4
/* A connection queue. */
Packit 4e8bc4
typedef struct conn_queue CQ;
Packit 4e8bc4
struct conn_queue {
Packit 4e8bc4
    CQ_ITEM *head;
Packit 4e8bc4
    CQ_ITEM *tail;
Packit 4e8bc4
    pthread_mutex_t lock;
Packit 4e8bc4
};
Packit 4e8bc4
Packit 4e8bc4
/* Locks for cache LRU operations */
Packit 4e8bc4
pthread_mutex_t lru_locks[POWER_LARGEST];
Packit 4e8bc4
Packit 4e8bc4
/* Connection lock around accepting new connections */
Packit 4e8bc4
pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
Packit 4e8bc4
Packit 4e8bc4
#if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
Packit 4e8bc4
pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
Packit 4e8bc4
#endif
Packit 4e8bc4
Packit 4e8bc4
/* Lock for global stats */
Packit 4e8bc4
static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
Packit 4e8bc4
Packit 4e8bc4
/* Lock to cause worker threads to hang up after being woken */
Packit 4e8bc4
static pthread_mutex_t worker_hang_lock;
Packit 4e8bc4
Packit 4e8bc4
/* Free list of CQ_ITEM structs */
Packit 4e8bc4
static CQ_ITEM *cqi_freelist;
Packit 4e8bc4
static pthread_mutex_t cqi_freelist_lock;
Packit 4e8bc4
Packit 4e8bc4
static pthread_mutex_t *item_locks;
Packit 4e8bc4
/* size of the item lock hash table */
Packit 4e8bc4
static uint32_t item_lock_count;
Packit 4e8bc4
unsigned int item_lock_hashpower;
Packit 4e8bc4
#define hashsize(n) ((unsigned long int)1<<(n))
Packit 4e8bc4
#define hashmask(n) (hashsize(n)-1)
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Each libevent instance has a wakeup pipe, which other threads
Packit 4e8bc4
 * can use to signal that they've put a new connection on its queue.
Packit 4e8bc4
 */
Packit 4e8bc4
static LIBEVENT_THREAD *threads;
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Number of worker threads that have finished setting themselves up.
Packit 4e8bc4
 */
Packit 4e8bc4
static int init_count = 0;
Packit 4e8bc4
static pthread_mutex_t init_lock;
Packit 4e8bc4
static pthread_cond_t init_cond;
Packit 4e8bc4
Packit 4e8bc4
Packit 4e8bc4
static void thread_libevent_process(int fd, short which, void *arg);
Packit 4e8bc4
Packit 4e8bc4
/* item_lock() must be held for an item before any modifications to either its
Packit 4e8bc4
 * associated hash bucket, or the structure itself.
Packit 4e8bc4
 * LRU modifications must hold the item lock, and the LRU lock.
Packit 4e8bc4
 * LRU's accessing items must item_trylock() before modifying an item.
Packit 4e8bc4
 * Items accessible from an LRU must not be freed or modified
Packit 4e8bc4
 * without first locking and removing from the LRU.
Packit 4e8bc4
 */
Packit 4e8bc4
Packit 4e8bc4
void item_lock(uint32_t hv) {
Packit 4e8bc4
    mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void *item_trylock(uint32_t hv) {
Packit 4e8bc4
    pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
Packit 4e8bc4
    if (pthread_mutex_trylock(lock) == 0) {
Packit 4e8bc4
        return lock;
Packit 4e8bc4
    }
Packit 4e8bc4
    return NULL;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void item_trylock_unlock(void *lock) {
Packit 4e8bc4
    mutex_unlock((pthread_mutex_t *) lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void item_unlock(uint32_t hv) {
Packit 4e8bc4
    mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static void wait_for_thread_registration(int nthreads) {
Packit 4e8bc4
    while (init_count < nthreads) {
Packit 4e8bc4
        pthread_cond_wait(&init_cond, &init_lock);
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
static void register_thread_initialized(void) {
Packit 4e8bc4
    pthread_mutex_lock(&init_lock);
Packit 4e8bc4
    init_count++;
Packit 4e8bc4
    pthread_cond_signal(&init_cond);
Packit 4e8bc4
    pthread_mutex_unlock(&init_lock);
Packit 4e8bc4
    /* Force worker threads to pile up if someone wants us to */
Packit 4e8bc4
    pthread_mutex_lock(&worker_hang_lock);
Packit 4e8bc4
    pthread_mutex_unlock(&worker_hang_lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* Must not be called with any deeper locks held */
Packit 4e8bc4
void pause_threads(enum pause_thread_types type) {
Packit 4e8bc4
    char buf[1];
Packit 4e8bc4
    int i;
Packit 4e8bc4
Packit 4e8bc4
    buf[0] = 0;
Packit 4e8bc4
    switch (type) {
Packit 4e8bc4
        case PAUSE_ALL_THREADS:
Packit 4e8bc4
            slabs_rebalancer_pause();
Packit 4e8bc4
            lru_maintainer_pause();
Packit 4e8bc4
            lru_crawler_pause();
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
            storage_compact_pause();
Packit 4e8bc4
            storage_write_pause();
Packit 4e8bc4
#endif
Packit 4e8bc4
        case PAUSE_WORKER_THREADS:
Packit 4e8bc4
            buf[0] = 'p';
Packit 4e8bc4
            pthread_mutex_lock(&worker_hang_lock);
Packit 4e8bc4
            break;
Packit 4e8bc4
        case RESUME_ALL_THREADS:
Packit 4e8bc4
            slabs_rebalancer_resume();
Packit 4e8bc4
            lru_maintainer_resume();
Packit 4e8bc4
            lru_crawler_resume();
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
            storage_compact_resume();
Packit 4e8bc4
            storage_write_resume();
Packit 4e8bc4
#endif
Packit 4e8bc4
        case RESUME_WORKER_THREADS:
Packit 4e8bc4
            pthread_mutex_unlock(&worker_hang_lock);
Packit 4e8bc4
            break;
Packit 4e8bc4
        default:
Packit 4e8bc4
            fprintf(stderr, "Unknown lock type: %d\n", type);
Packit 4e8bc4
            assert(1 == 0);
Packit 4e8bc4
            break;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    /* Only send a message if we have one. */
Packit 4e8bc4
    if (buf[0] == 0) {
Packit 4e8bc4
        return;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_lock(&init_lock);
Packit 4e8bc4
    init_count = 0;
Packit 4e8bc4
    for (i = 0; i < settings.num_threads; i++) {
Packit 4e8bc4
        if (write(threads[i].notify_send_fd, buf, 1) != 1) {
Packit 4e8bc4
            perror("Failed writing to notify pipe");
Packit 4e8bc4
            /* TODO: This is a fatal problem. Can it ever happen temporarily? */
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
    wait_for_thread_registration(settings.num_threads);
Packit 4e8bc4
    pthread_mutex_unlock(&init_lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
// MUST not be called with any deeper locks held
Packit 4e8bc4
// MUST be called only by parent thread
Packit 4e8bc4
// Note: listener thread is the "main" event base, which has exited its
Packit 4e8bc4
// loop in order to call this function.
Packit 4e8bc4
void stop_threads(void) {
Packit 4e8bc4
    char buf[1];
Packit 4e8bc4
    int i;
Packit 4e8bc4
Packit 4e8bc4
    // assoc can call pause_threads(), so we have to stop it first.
Packit 4e8bc4
    stop_assoc_maintenance_thread();
Packit 4e8bc4
    if (settings.verbose > 0)
Packit 4e8bc4
        fprintf(stderr, "stopped assoc\n");
Packit 4e8bc4
Packit 4e8bc4
    if (settings.verbose > 0)
Packit 4e8bc4
        fprintf(stderr, "asking workers to stop\n");
Packit 4e8bc4
    buf[0] = 's';
Packit Service a9e202
    pthread_mutex_lock(&worker_hang_lock);
Packit 4e8bc4
    pthread_mutex_lock(&init_lock);
Packit 4e8bc4
    init_count = 0;
Packit 4e8bc4
    for (i = 0; i < settings.num_threads; i++) {
Packit 4e8bc4
        if (write(threads[i].notify_send_fd, buf, 1) != 1) {
Packit 4e8bc4
            perror("Failed writing to notify pipe");
Packit 4e8bc4
            /* TODO: This is a fatal problem. Can it ever happen temporarily? */
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
    wait_for_thread_registration(settings.num_threads);
Packit 4e8bc4
    pthread_mutex_unlock(&init_lock);
Packit 4e8bc4
Packit Service a9e202
    // All of the workers are hung but haven't done cleanup yet.
Packit Service a9e202
Packit 4e8bc4
    if (settings.verbose > 0)
Packit 4e8bc4
        fprintf(stderr, "asking background threads to stop\n");
Packit 4e8bc4
Packit 4e8bc4
    // stop each side thread.
Packit 4e8bc4
    // TODO: Verify these all work if the threads are already stopped
Packit 4e8bc4
    stop_item_crawler_thread(CRAWLER_WAIT);
Packit 4e8bc4
    if (settings.verbose > 0)
Packit 4e8bc4
        fprintf(stderr, "stopped lru crawler\n");
Packit Service 73fdd3
    if (settings.lru_maintainer_thread) {
Packit Service 73fdd3
        stop_lru_maintainer_thread();
Packit Service 73fdd3
        if (settings.verbose > 0)
Packit Service 73fdd3
            fprintf(stderr, "stopped maintainer\n");
Packit Service 73fdd3
    }
Packit Service 73fdd3
    if (settings.slab_reassign) {
Packit Service 73fdd3
        stop_slab_maintenance_thread();
Packit Service 73fdd3
        if (settings.verbose > 0)
Packit Service 73fdd3
            fprintf(stderr, "stopped slab mover\n");
Packit Service 73fdd3
    }
Packit 4e8bc4
    logger_stop();
Packit 4e8bc4
    if (settings.verbose > 0)
Packit 4e8bc4
        fprintf(stderr, "stopped logger thread\n");
Packit 4e8bc4
    stop_conn_timeout_thread();
Packit 4e8bc4
    if (settings.verbose > 0)
Packit 4e8bc4
        fprintf(stderr, "stopped idle timeout thread\n");
Packit 4e8bc4
Packit Service a9e202
    // Close all connections then let the workers finally exit.
Packit Service a9e202
    if (settings.verbose > 0)
Packit Service a9e202
        fprintf(stderr, "closing connections\n");
Packit Service a9e202
    conn_close_all();
Packit Service a9e202
    pthread_mutex_unlock(&worker_hang_lock);
Packit Service a9e202
    if (settings.verbose > 0)
Packit Service a9e202
        fprintf(stderr, "reaping worker threads\n");
Packit Service a9e202
    for (i = 0; i < settings.num_threads; i++) {
Packit Service a9e202
        pthread_join(threads[i].thread_id, NULL);
Packit Service a9e202
    }
Packit Service a9e202
Packit 4e8bc4
    if (settings.verbose > 0)
Packit 4e8bc4
        fprintf(stderr, "all background threads stopped\n");
Packit 4e8bc4
Packit 4e8bc4
    // At this point, every background thread must be stopped.
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Initializes a connection queue.
Packit 4e8bc4
 */
Packit 4e8bc4
static void cq_init(CQ *cq) {
Packit 4e8bc4
    pthread_mutex_init(&cq->lock, NULL);
Packit 4e8bc4
    cq->head = NULL;
Packit 4e8bc4
    cq->tail = NULL;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Looks for an item on a connection queue, but doesn't block if there isn't
Packit 4e8bc4
 * one.
Packit 4e8bc4
 * Returns the item, or NULL if no item is available
Packit 4e8bc4
 */
Packit 4e8bc4
static CQ_ITEM *cq_pop(CQ *cq) {
Packit 4e8bc4
    CQ_ITEM *item;
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_lock(&cq->lock);
Packit 4e8bc4
    item = cq->head;
Packit 4e8bc4
    if (NULL != item) {
Packit 4e8bc4
        cq->head = item->next;
Packit 4e8bc4
        if (NULL == cq->head)
Packit 4e8bc4
            cq->tail = NULL;
Packit 4e8bc4
    }
Packit 4e8bc4
    pthread_mutex_unlock(&cq->lock);
Packit 4e8bc4
Packit 4e8bc4
    return item;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Adds an item to a connection queue.
Packit 4e8bc4
 */
Packit 4e8bc4
static void cq_push(CQ *cq, CQ_ITEM *item) {
Packit 4e8bc4
    item->next = NULL;
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_lock(&cq->lock);
Packit 4e8bc4
    if (NULL == cq->tail)
Packit 4e8bc4
        cq->head = item;
Packit 4e8bc4
    else
Packit 4e8bc4
        cq->tail->next = item;
Packit 4e8bc4
    cq->tail = item;
Packit 4e8bc4
    pthread_mutex_unlock(&cq->lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Returns a fresh connection queue item.
Packit 4e8bc4
 */
Packit 4e8bc4
static CQ_ITEM *cqi_new(void) {
Packit 4e8bc4
    CQ_ITEM *item = NULL;
Packit 4e8bc4
    pthread_mutex_lock(&cqi_freelist_lock);
Packit 4e8bc4
    if (cqi_freelist) {
Packit 4e8bc4
        item = cqi_freelist;
Packit 4e8bc4
        cqi_freelist = item->next;
Packit 4e8bc4
    }
Packit 4e8bc4
    pthread_mutex_unlock(&cqi_freelist_lock);
Packit 4e8bc4
Packit 4e8bc4
    if (NULL == item) {
Packit 4e8bc4
        int i;
Packit 4e8bc4
Packit 4e8bc4
        /* Allocate a bunch of items at once to reduce fragmentation */
Packit 4e8bc4
        item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
Packit 4e8bc4
        if (NULL == item) {
Packit 4e8bc4
            STATS_LOCK();
Packit 4e8bc4
            stats.malloc_fails++;
Packit 4e8bc4
            STATS_UNLOCK();
Packit 4e8bc4
            return NULL;
Packit 4e8bc4
        }
Packit 4e8bc4
Packit 4e8bc4
        /*
Packit 4e8bc4
         * Link together all the new items except the first one
Packit 4e8bc4
         * (which we'll return to the caller) for placement on
Packit 4e8bc4
         * the freelist.
Packit 4e8bc4
         */
Packit 4e8bc4
        for (i = 2; i < ITEMS_PER_ALLOC; i++)
Packit 4e8bc4
            item[i - 1].next = &item[i];
Packit 4e8bc4
Packit 4e8bc4
        pthread_mutex_lock(&cqi_freelist_lock);
Packit 4e8bc4
        item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
Packit 4e8bc4
        cqi_freelist = &item[1];
Packit 4e8bc4
        pthread_mutex_unlock(&cqi_freelist_lock);
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    return item;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Frees a connection queue item (adds it to the freelist.)
Packit 4e8bc4
 */
Packit 4e8bc4
static void cqi_free(CQ_ITEM *item) {
Packit 4e8bc4
    pthread_mutex_lock(&cqi_freelist_lock);
Packit 4e8bc4
    item->next = cqi_freelist;
Packit 4e8bc4
    cqi_freelist = item;
Packit 4e8bc4
    pthread_mutex_unlock(&cqi_freelist_lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Creates a worker thread.
Packit 4e8bc4
 */
Packit 4e8bc4
static void create_worker(void *(*func)(void *), void *arg) {
Packit 4e8bc4
    pthread_attr_t  attr;
Packit 4e8bc4
    int             ret;
Packit 4e8bc4
Packit 4e8bc4
    pthread_attr_init(&attr);
Packit 4e8bc4
Packit 4e8bc4
    if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
Packit 4e8bc4
        fprintf(stderr, "Can't create thread: %s\n",
Packit 4e8bc4
                strerror(ret));
Packit 4e8bc4
        exit(1);
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Sets whether or not we accept new connections.
Packit 4e8bc4
 */
Packit 4e8bc4
void accept_new_conns(const bool do_accept) {
Packit 4e8bc4
    pthread_mutex_lock(&conn_lock);
Packit 4e8bc4
    do_accept_new_conns(do_accept);
Packit 4e8bc4
    pthread_mutex_unlock(&conn_lock);
Packit 4e8bc4
}
Packit 4e8bc4
/****************************** LIBEVENT THREADS *****************************/
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Set up a thread's information.
Packit 4e8bc4
 */
Packit 4e8bc4
static void setup_thread(LIBEVENT_THREAD *me) {
Packit 4e8bc4
#if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
Packit 4e8bc4
    struct event_config *ev_config;
Packit 4e8bc4
    ev_config = event_config_new();
Packit 4e8bc4
    event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
Packit 4e8bc4
    me->base = event_base_new_with_config(ev_config);
Packit 4e8bc4
    event_config_free(ev_config);
Packit 4e8bc4
#else
Packit 4e8bc4
    me->base = event_init();
Packit 4e8bc4
#endif
Packit 4e8bc4
Packit 4e8bc4
    if (! me->base) {
Packit 4e8bc4
        fprintf(stderr, "Can't allocate event base\n");
Packit 4e8bc4
        exit(1);
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    /* Listen for notifications from other threads */
Packit 4e8bc4
    event_set(&me->notify_event, me->notify_receive_fd,
Packit 4e8bc4
              EV_READ | EV_PERSIST, thread_libevent_process, me);
Packit 4e8bc4
    event_base_set(me->base, &me->notify_event);
Packit 4e8bc4
Packit 4e8bc4
    if (event_add(&me->notify_event, 0) == -1) {
Packit 4e8bc4
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
Packit 4e8bc4
        exit(1);
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
Packit 4e8bc4
    if (me->new_conn_queue == NULL) {
Packit 4e8bc4
        perror("Failed to allocate memory for connection queue");
Packit 4e8bc4
        exit(EXIT_FAILURE);
Packit 4e8bc4
    }
Packit 4e8bc4
    cq_init(me->new_conn_queue);
Packit 4e8bc4
Packit 4e8bc4
    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
Packit 4e8bc4
        perror("Failed to initialize mutex");
Packit 4e8bc4
        exit(EXIT_FAILURE);
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
Packit 4e8bc4
                                    NULL, NULL);
Packit 4e8bc4
    if (me->suffix_cache == NULL) {
Packit 4e8bc4
        fprintf(stderr, "Failed to create suffix cache\n");
Packit 4e8bc4
        exit(EXIT_FAILURE);
Packit 4e8bc4
    }
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
    me->io_cache = cache_create("io", sizeof(io_wrap), sizeof(char*), NULL, NULL);
Packit 4e8bc4
    if (me->io_cache == NULL) {
Packit 4e8bc4
        fprintf(stderr, "Failed to create IO object cache\n");
Packit 4e8bc4
        exit(EXIT_FAILURE);
Packit 4e8bc4
    }
Packit 4e8bc4
#endif
Packit 4e8bc4
#ifdef TLS
Packit 4e8bc4
    if (settings.ssl_enabled) {
Packit 4e8bc4
        me->ssl_wbuf = (char *)malloc((size_t)settings.ssl_wbuf_size);
Packit 4e8bc4
        if (me->ssl_wbuf == NULL) {
Packit 4e8bc4
            fprintf(stderr, "Failed to allocate the SSL write buffer\n");
Packit 4e8bc4
            exit(EXIT_FAILURE);
Packit 4e8bc4
        }
Packit 4e8bc4
    }
Packit 4e8bc4
#endif
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Worker thread: main event loop
Packit 4e8bc4
 */
Packit 4e8bc4
static void *worker_libevent(void *arg) {
Packit 4e8bc4
    LIBEVENT_THREAD *me = arg;
Packit 4e8bc4
Packit 4e8bc4
    /* Any per-thread setup can happen here; memcached_thread_init() will block until
Packit 4e8bc4
     * all threads have finished initializing.
Packit 4e8bc4
     */
Packit 4e8bc4
    me->l = logger_create();
Packit 4e8bc4
    me->lru_bump_buf = item_lru_bump_buf_create();
Packit 4e8bc4
    if (me->l == NULL || me->lru_bump_buf == NULL) {
Packit 4e8bc4
        abort();
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    if (settings.drop_privileges) {
Packit 4e8bc4
        drop_worker_privileges();
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    register_thread_initialized();
Packit 4e8bc4
Packit 4e8bc4
    event_base_loop(me->base, 0);
Packit 4e8bc4
Packit 4e8bc4
    // same mechanism used to watch for all threads exiting.
Packit 4e8bc4
    register_thread_initialized();
Packit 4e8bc4
Packit 4e8bc4
    event_base_free(me->base);
Packit 4e8bc4
    return NULL;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Processes an incoming "handle a new connection" item. This is called when
Packit 4e8bc4
 * input arrives on the libevent wakeup pipe.
Packit 4e8bc4
 */
Packit 4e8bc4
static void thread_libevent_process(int fd, short which, void *arg) {
Packit 4e8bc4
    LIBEVENT_THREAD *me = arg;
Packit 4e8bc4
    CQ_ITEM *item;
Packit 4e8bc4
    char buf[1];
Packit 4e8bc4
    conn *c;
Packit 4e8bc4
    unsigned int timeout_fd;
Packit 4e8bc4
Packit 4e8bc4
    if (read(fd, buf, 1) != 1) {
Packit 4e8bc4
        if (settings.verbose > 0)
Packit 4e8bc4
            fprintf(stderr, "Can't read from libevent pipe\n");
Packit 4e8bc4
        return;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    switch (buf[0]) {
Packit 4e8bc4
    case 'c':
Packit 4e8bc4
        item = cq_pop(me->new_conn_queue);
Packit 4e8bc4
Packit 4e8bc4
        if (NULL == item) {
Packit 4e8bc4
            break;
Packit 4e8bc4
        }
Packit 4e8bc4
        switch (item->mode) {
Packit 4e8bc4
            case queue_new_conn:
Packit 4e8bc4
                c = conn_new(item->sfd, item->init_state, item->event_flags,
Packit 4e8bc4
                                   item->read_buffer_size, item->transport,
Packit 4e8bc4
                                   me->base, item->ssl);
Packit 4e8bc4
                if (c == NULL) {
Packit 4e8bc4
                    if (IS_UDP(item->transport)) {
Packit 4e8bc4
                        fprintf(stderr, "Can't listen for events on UDP socket\n");
Packit 4e8bc4
                        exit(1);
Packit 4e8bc4
                    } else {
Packit 4e8bc4
                        if (settings.verbose > 0) {
Packit 4e8bc4
                            fprintf(stderr, "Can't listen for events on fd %d\n",
Packit 4e8bc4
                                item->sfd);
Packit 4e8bc4
                        }
Packit 4e8bc4
#ifdef TLS
Packit 4e8bc4
                        if (item->ssl) {
Packit 4e8bc4
                            SSL_shutdown(item->ssl);
Packit 4e8bc4
                            SSL_free(item->ssl);
Packit 4e8bc4
                        }
Packit 4e8bc4
#endif
Packit 4e8bc4
                        close(item->sfd);
Packit 4e8bc4
                    }
Packit 4e8bc4
                } else {
Packit 4e8bc4
                    c->thread = me;
Packit 4e8bc4
#ifdef TLS
Packit 4e8bc4
                    if (settings.ssl_enabled && c->ssl != NULL) {
Packit 4e8bc4
                        assert(c->thread && c->thread->ssl_wbuf);
Packit 4e8bc4
                        c->ssl_wbuf = c->thread->ssl_wbuf;
Packit 4e8bc4
                    }
Packit 4e8bc4
#endif
Packit 4e8bc4
                }
Packit 4e8bc4
                break;
Packit 4e8bc4
Packit 4e8bc4
            case queue_redispatch:
Packit 4e8bc4
                conn_worker_readd(item->c);
Packit 4e8bc4
                break;
Packit 4e8bc4
        }
Packit 4e8bc4
        cqi_free(item);
Packit 4e8bc4
        break;
Packit 4e8bc4
    /* we were told to pause and report in */
Packit 4e8bc4
    case 'p':
Packit 4e8bc4
        register_thread_initialized();
Packit 4e8bc4
        break;
Packit 4e8bc4
    /* a client socket timed out */
Packit 4e8bc4
    case 't':
Packit 4e8bc4
        if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) {
Packit 4e8bc4
            if (settings.verbose > 0)
Packit 4e8bc4
                fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
Packit 4e8bc4
            return;
Packit 4e8bc4
        }
Packit 4e8bc4
        conn_close_idle(conns[timeout_fd]);
Packit 4e8bc4
        break;
Packit 4e8bc4
    /* asked to stop */
Packit 4e8bc4
    case 's':
Packit 4e8bc4
        event_base_loopexit(me->base, NULL);
Packit 4e8bc4
        break;
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* Which thread we assigned a connection to most recently. */
Packit 4e8bc4
static int last_thread = -1;
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Dispatches a new connection to another thread. This is only ever called
Packit 4e8bc4
 * from the main thread, either during initialization (for UDP) or because
Packit 4e8bc4
 * of an incoming connection.
Packit 4e8bc4
 */
Packit 4e8bc4
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
Packit 4e8bc4
                       int read_buffer_size, enum network_transport transport, void *ssl) {
Packit 4e8bc4
    CQ_ITEM *item = cqi_new();
Packit 4e8bc4
    char buf[1];
Packit 4e8bc4
    if (item == NULL) {
Packit 4e8bc4
        close(sfd);
Packit 4e8bc4
        /* given that malloc failed this may also fail, but let's try */
Packit 4e8bc4
        fprintf(stderr, "Failed to allocate memory for connection object\n");
Packit 4e8bc4
        return;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    int tid = (last_thread + 1) % settings.num_threads;
Packit 4e8bc4
Packit 4e8bc4
    LIBEVENT_THREAD *thread = threads + tid;
Packit 4e8bc4
Packit 4e8bc4
    last_thread = tid;
Packit 4e8bc4
Packit 4e8bc4
    item->sfd = sfd;
Packit 4e8bc4
    item->init_state = init_state;
Packit 4e8bc4
    item->event_flags = event_flags;
Packit 4e8bc4
    item->read_buffer_size = read_buffer_size;
Packit 4e8bc4
    item->transport = transport;
Packit 4e8bc4
    item->mode = queue_new_conn;
Packit 4e8bc4
    item->ssl = ssl;
Packit 4e8bc4
Packit 4e8bc4
    cq_push(thread->new_conn_queue, item);
Packit 4e8bc4
Packit 4e8bc4
    MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
Packit 4e8bc4
    buf[0] = 'c';
Packit 4e8bc4
    if (write(thread->notify_send_fd, buf, 1) != 1) {
Packit 4e8bc4
        perror("Writing to thread notify pipe");
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Re-dispatches a connection back to the original thread. Can be called from
Packit 4e8bc4
 * any side thread borrowing a connection.
Packit 4e8bc4
 */
Packit 4e8bc4
void redispatch_conn(conn *c) {
Packit 4e8bc4
    CQ_ITEM *item = cqi_new();
Packit 4e8bc4
    char buf[1];
Packit 4e8bc4
    if (item == NULL) {
Packit 4e8bc4
        /* Can't cleanly redispatch connection. close it forcefully. */
Packit 4e8bc4
        c->state = conn_closed;
Packit 4e8bc4
        close(c->sfd);
Packit 4e8bc4
        return;
Packit 4e8bc4
    }
Packit 4e8bc4
    LIBEVENT_THREAD *thread = c->thread;
Packit 4e8bc4
    item->sfd = c->sfd;
Packit 4e8bc4
    item->init_state = conn_new_cmd;
Packit 4e8bc4
    item->c = c;
Packit 4e8bc4
    item->mode = queue_redispatch;
Packit 4e8bc4
Packit 4e8bc4
    cq_push(thread->new_conn_queue, item);
Packit 4e8bc4
Packit 4e8bc4
    buf[0] = 'c';
Packit 4e8bc4
    if (write(thread->notify_send_fd, buf, 1) != 1) {
Packit 4e8bc4
        perror("Writing to thread notify pipe");
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/* This misses the allow_new_conns flag :( */
Packit 4e8bc4
void sidethread_conn_close(conn *c) {
Packit 4e8bc4
    c->state = conn_closed;
Packit 4e8bc4
    if (settings.verbose > 1)
Packit 4e8bc4
        fprintf(stderr, "<%d connection closed from side thread.\n", c->sfd);
Packit 4e8bc4
#ifdef TLS
Packit 4e8bc4
    if (c->ssl) {
Packit 4e8bc4
        c->ssl_wbuf = NULL;
Packit 4e8bc4
        SSL_shutdown(c->ssl);
Packit 4e8bc4
        SSL_free(c->ssl);
Packit 4e8bc4
    }
Packit 4e8bc4
#endif
Packit 4e8bc4
    close(c->sfd);
Packit 4e8bc4
Packit 4e8bc4
    STATS_LOCK();
Packit 4e8bc4
    stats_state.curr_conns--;
Packit 4e8bc4
    STATS_UNLOCK();
Packit 4e8bc4
Packit 4e8bc4
    return;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/********************************* ITEM ACCESS *******************************/
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Allocates a new item.
Packit 4e8bc4
 */
Packit 4e8bc4
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
Packit 4e8bc4
    item *it;
Packit 4e8bc4
    /* do_item_alloc handles its own locks */
Packit 4e8bc4
    it = do_item_alloc(key, nkey, flags, exptime, nbytes);
Packit 4e8bc4
    return it;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Returns an item if it hasn't been marked as expired,
Packit 4e8bc4
 * lazy-expiring as needed.
Packit 4e8bc4
 */
Packit 4e8bc4
item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) {
Packit 4e8bc4
    item *it;
Packit 4e8bc4
    uint32_t hv;
Packit 4e8bc4
    hv = hash(key, nkey);
Packit 4e8bc4
    item_lock(hv);
Packit 4e8bc4
    it = do_item_get(key, nkey, hv, c, do_update);
Packit 4e8bc4
    item_unlock(hv);
Packit 4e8bc4
    return it;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
// returns an item with the item lock held.
Packit 4e8bc4
// lock will still be held even if return is NULL, allowing caller to replace
Packit 4e8bc4
// an item atomically if desired.
Packit 4e8bc4
item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv) {
Packit 4e8bc4
    item *it;
Packit 4e8bc4
    *hv = hash(key, nkey);
Packit 4e8bc4
    item_lock(*hv);
Packit 4e8bc4
    it = do_item_get(key, nkey, *hv, c, do_update);
Packit 4e8bc4
    return it;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) {
Packit 4e8bc4
    item *it;
Packit 4e8bc4
    uint32_t hv;
Packit 4e8bc4
    hv = hash(key, nkey);
Packit 4e8bc4
    item_lock(hv);
Packit 4e8bc4
    it = do_item_touch(key, nkey, exptime, hv, c);
Packit 4e8bc4
    item_unlock(hv);
Packit 4e8bc4
    return it;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Links an item into the LRU and hashtable.
Packit 4e8bc4
 */
Packit 4e8bc4
int item_link(item *item) {
Packit 4e8bc4
    int ret;
Packit 4e8bc4
    uint32_t hv;
Packit 4e8bc4
Packit 4e8bc4
    hv = hash(ITEM_key(item), item->nkey);
Packit 4e8bc4
    item_lock(hv);
Packit 4e8bc4
    ret = do_item_link(item, hv);
Packit 4e8bc4
    item_unlock(hv);
Packit 4e8bc4
    return ret;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Decrements the reference count on an item and adds it to the freelist if
Packit 4e8bc4
 * needed.
Packit 4e8bc4
 */
Packit 4e8bc4
void item_remove(item *item) {
Packit 4e8bc4
    uint32_t hv;
Packit 4e8bc4
    hv = hash(ITEM_key(item), item->nkey);
Packit 4e8bc4
Packit 4e8bc4
    item_lock(hv);
Packit 4e8bc4
    do_item_remove(item);
Packit 4e8bc4
    item_unlock(hv);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Replaces one item with another in the hashtable.
Packit 4e8bc4
 * Unprotected by a mutex lock since the core server does not require
Packit 4e8bc4
 * it to be thread-safe.
Packit 4e8bc4
 */
Packit 4e8bc4
int item_replace(item *old_it, item *new_it, const uint32_t hv) {
Packit 4e8bc4
    return do_item_replace(old_it, new_it, hv);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Unlinks an item from the LRU and hashtable.
Packit 4e8bc4
 */
Packit 4e8bc4
void item_unlink(item *item) {
Packit 4e8bc4
    uint32_t hv;
Packit 4e8bc4
    hv = hash(ITEM_key(item), item->nkey);
Packit 4e8bc4
    item_lock(hv);
Packit 4e8bc4
    do_item_unlink(item, hv);
Packit 4e8bc4
    item_unlock(hv);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Does arithmetic on a numeric item value.
Packit 4e8bc4
 */
Packit 4e8bc4
enum delta_result_type add_delta(conn *c, const char *key,
Packit 4e8bc4
                                 const size_t nkey, bool incr,
Packit 4e8bc4
                                 const int64_t delta, char *buf,
Packit 4e8bc4
                                 uint64_t *cas) {
Packit 4e8bc4
    enum delta_result_type ret;
Packit 4e8bc4
    uint32_t hv;
Packit 4e8bc4
Packit 4e8bc4
    hv = hash(key, nkey);
Packit 4e8bc4
    item_lock(hv);
Packit 4e8bc4
    ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
Packit 4e8bc4
    item_unlock(hv);
Packit 4e8bc4
    return ret;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Stores an item in the cache (high level, obeys set/add/replace semantics)
Packit 4e8bc4
 */
Packit 4e8bc4
enum store_item_type store_item(item *item, int comm, conn* c) {
Packit 4e8bc4
    enum store_item_type ret;
Packit 4e8bc4
    uint32_t hv;
Packit 4e8bc4
Packit 4e8bc4
    hv = hash(ITEM_key(item), item->nkey);
Packit 4e8bc4
    item_lock(hv);
Packit 4e8bc4
    ret = do_store_item(item, comm, c, hv);
Packit 4e8bc4
    item_unlock(hv);
Packit 4e8bc4
    return ret;
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/******************************* GLOBAL STATS ******************************/
Packit 4e8bc4
Packit 4e8bc4
void STATS_LOCK() {
Packit 4e8bc4
    pthread_mutex_lock(&stats_lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void STATS_UNLOCK() {
Packit 4e8bc4
    pthread_mutex_unlock(&stats_lock);
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void threadlocal_stats_reset(void) {
Packit 4e8bc4
    int ii;
Packit 4e8bc4
    for (ii = 0; ii < settings.num_threads; ++ii) {
Packit 4e8bc4
        pthread_mutex_lock(&threads[ii].stats.mutex);
Packit 4e8bc4
#define X(name) threads[ii].stats.name = 0;
Packit 4e8bc4
        THREAD_STATS_FIELDS
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
        EXTSTORE_THREAD_STATS_FIELDS
Packit 4e8bc4
#endif
Packit 4e8bc4
#undef X
Packit 4e8bc4
Packit 4e8bc4
        memset(&threads[ii].stats.slab_stats, 0,
Packit 4e8bc4
                sizeof(threads[ii].stats.slab_stats));
Packit 4e8bc4
        memset(&threads[ii].stats.lru_hits, 0,
Packit 4e8bc4
                sizeof(uint64_t) * POWER_LARGEST);
Packit 4e8bc4
Packit 4e8bc4
        pthread_mutex_unlock(&threads[ii].stats.mutex);
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void threadlocal_stats_aggregate(struct thread_stats *stats) {
Packit 4e8bc4
    int ii, sid;
Packit 4e8bc4
Packit 4e8bc4
    /* The struct has a mutex, but we can safely set the whole thing
Packit 4e8bc4
     * to zero since it is unused when aggregating. */
Packit 4e8bc4
    memset(stats, 0, sizeof(*stats));
Packit 4e8bc4
Packit 4e8bc4
    for (ii = 0; ii < settings.num_threads; ++ii) {
Packit 4e8bc4
        pthread_mutex_lock(&threads[ii].stats.mutex);
Packit 4e8bc4
#define X(name) stats->name += threads[ii].stats.name;
Packit 4e8bc4
        THREAD_STATS_FIELDS
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
        EXTSTORE_THREAD_STATS_FIELDS
Packit 4e8bc4
#endif
Packit 4e8bc4
#undef X
Packit 4e8bc4
Packit 4e8bc4
        for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
Packit 4e8bc4
#define X(name) stats->slab_stats[sid].name += \
Packit 4e8bc4
            threads[ii].stats.slab_stats[sid].name;
Packit 4e8bc4
            SLAB_STATS_FIELDS
Packit 4e8bc4
#undef X
Packit 4e8bc4
        }
Packit 4e8bc4
Packit 4e8bc4
        for (sid = 0; sid < POWER_LARGEST; sid++) {
Packit 4e8bc4
            stats->lru_hits[sid] +=
Packit 4e8bc4
                threads[ii].stats.lru_hits[sid];
Packit 4e8bc4
            stats->slab_stats[CLEAR_LRU(sid)].get_hits +=
Packit 4e8bc4
                threads[ii].stats.lru_hits[sid];
Packit 4e8bc4
        }
Packit 4e8bc4
Packit 4e8bc4
        pthread_mutex_unlock(&threads[ii].stats.mutex);
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
Packit 4e8bc4
    int sid;
Packit 4e8bc4
Packit 4e8bc4
    memset(out, 0, sizeof(*out));
Packit 4e8bc4
Packit 4e8bc4
    for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
Packit 4e8bc4
#define X(name) out->name += stats->slab_stats[sid].name;
Packit 4e8bc4
        SLAB_STATS_FIELDS
Packit 4e8bc4
#undef X
Packit 4e8bc4
    }
Packit 4e8bc4
}
Packit 4e8bc4
Packit 4e8bc4
/*
Packit 4e8bc4
 * Initializes the thread subsystem, creating various worker threads.
Packit 4e8bc4
 *
Packit 4e8bc4
 * nthreads  Number of worker event handler threads to spawn
Packit 4e8bc4
 */
Packit 4e8bc4
void memcached_thread_init(int nthreads, void *arg) {
Packit 4e8bc4
    int         i;
Packit 4e8bc4
    int         power;
Packit 4e8bc4
Packit 4e8bc4
    for (i = 0; i < POWER_LARGEST; i++) {
Packit 4e8bc4
        pthread_mutex_init(&lru_locks[i], NULL);
Packit 4e8bc4
    }
Packit 4e8bc4
    pthread_mutex_init(&worker_hang_lock, NULL);
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_init(&init_lock, NULL);
Packit 4e8bc4
    pthread_cond_init(&init_cond, NULL);
Packit 4e8bc4
Packit 4e8bc4
    pthread_mutex_init(&cqi_freelist_lock, NULL);
Packit 4e8bc4
    cqi_freelist = NULL;
Packit 4e8bc4
Packit 4e8bc4
    /* Want a wide lock table, but don't waste memory */
Packit 4e8bc4
    if (nthreads < 3) {
Packit 4e8bc4
        power = 10;
Packit 4e8bc4
    } else if (nthreads < 4) {
Packit 4e8bc4
        power = 11;
Packit 4e8bc4
    } else if (nthreads < 5) {
Packit 4e8bc4
        power = 12;
Packit 4e8bc4
    } else if (nthreads <= 10) {
Packit 4e8bc4
        power = 13;
Packit 4e8bc4
    } else if (nthreads <= 20) {
Packit 4e8bc4
        power = 14;
Packit 4e8bc4
    } else {
Packit 4e8bc4
        /* 32k buckets. just under the hashpower default. */
Packit 4e8bc4
        power = 15;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    if (power >= hashpower) {
Packit 4e8bc4
        fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
Packit 4e8bc4
        fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
Packit 4e8bc4
        fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
Packit 4e8bc4
        exit(1);
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    item_lock_count = hashsize(power);
Packit 4e8bc4
    item_lock_hashpower = power;
Packit 4e8bc4
Packit 4e8bc4
    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
Packit 4e8bc4
    if (! item_locks) {
Packit 4e8bc4
        perror("Can't allocate item locks");
Packit 4e8bc4
        exit(1);
Packit 4e8bc4
    }
Packit 4e8bc4
    for (i = 0; i < item_lock_count; i++) {
Packit 4e8bc4
        pthread_mutex_init(&item_locks[i], NULL);
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
Packit 4e8bc4
    if (! threads) {
Packit 4e8bc4
        perror("Can't allocate thread descriptors");
Packit 4e8bc4
        exit(1);
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    for (i = 0; i < nthreads; i++) {
Packit 4e8bc4
        int fds[2];
Packit 4e8bc4
        if (pipe(fds)) {
Packit 4e8bc4
            perror("Can't create notify pipe");
Packit 4e8bc4
            exit(1);
Packit 4e8bc4
        }
Packit 4e8bc4
Packit 4e8bc4
        threads[i].notify_receive_fd = fds[0];
Packit 4e8bc4
        threads[i].notify_send_fd = fds[1];
Packit 4e8bc4
#ifdef EXTSTORE
Packit 4e8bc4
        threads[i].storage = arg;
Packit 4e8bc4
#endif
Packit 4e8bc4
        setup_thread(&threads[i]);
Packit 4e8bc4
        /* Reserve three fds for the libevent base, and two for the pipe */
Packit 4e8bc4
        stats_state.reserved_fds += 5;
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    /* Create threads after we've done all the libevent setup. */
Packit 4e8bc4
    for (i = 0; i < nthreads; i++) {
Packit 4e8bc4
        create_worker(worker_libevent, &threads[i]);
Packit 4e8bc4
    }
Packit 4e8bc4
Packit 4e8bc4
    /* Wait for all the threads to set themselves up before returning. */
Packit 4e8bc4
    pthread_mutex_lock(&init_lock);
Packit 4e8bc4
    wait_for_thread_registration(nthreads);
Packit 4e8bc4
    pthread_mutex_unlock(&init_lock);
Packit 4e8bc4
}
Packit 4e8bc4