Blame thread.c

Packit Service 584ef9
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
Packit Service 584ef9
/*
Packit Service 584ef9
 * Thread management for memcached.
Packit Service 584ef9
 */
Packit Service 584ef9
#include "memcached.h"
Packit Service 584ef9
#ifdef EXTSTORE
Packit Service 584ef9
#include "storage.h"
Packit Service 584ef9
#endif
Packit Service 584ef9
#include <assert.h>
Packit Service 584ef9
#include <stdio.h>
Packit Service 584ef9
#include <errno.h>
Packit Service 584ef9
#include <stdlib.h>
Packit Service 584ef9
#include <string.h>
Packit Service 584ef9
#include <pthread.h>
Packit Service 584ef9
Packit Service 584ef9
#ifdef __sun
Packit Service 584ef9
#include <atomic.h>
Packit Service 584ef9
#endif
Packit Service 584ef9
Packit Service 584ef9
#ifdef TLS
Packit Service 584ef9
#include <openssl/ssl.h>
Packit Service 584ef9
#endif
Packit Service 584ef9
Packit Service 584ef9
#define ITEMS_PER_ALLOC 64
Packit Service 584ef9
Packit Service 584ef9
/* An item in the connection queue. */
Packit Service 584ef9
enum conn_queue_item_modes {
Packit Service 584ef9
    queue_new_conn,   /* brand new connection. */
Packit Service 584ef9
    queue_redispatch, /* redispatching from side thread */
Packit Service 584ef9
};
Packit Service 584ef9
typedef struct conn_queue_item CQ_ITEM;
Packit Service 584ef9
struct conn_queue_item {
Packit Service 584ef9
    int               sfd;
Packit Service 584ef9
    enum conn_states  init_state;
Packit Service 584ef9
    int               event_flags;
Packit Service 584ef9
    int               read_buffer_size;
Packit Service 584ef9
    enum network_transport     transport;
Packit Service 584ef9
    enum conn_queue_item_modes mode;
Packit Service 584ef9
    conn *c;
Packit Service 584ef9
    void    *ssl;
Packit Service 584ef9
    CQ_ITEM          *next;
Packit Service 584ef9
};
Packit Service 584ef9
Packit Service 584ef9
/* A connection queue. */
Packit Service 584ef9
typedef struct conn_queue CQ;
Packit Service 584ef9
struct conn_queue {
Packit Service 584ef9
    CQ_ITEM *head;
Packit Service 584ef9
    CQ_ITEM *tail;
Packit Service 584ef9
    pthread_mutex_t lock;
Packit Service 584ef9
};
Packit Service 584ef9
Packit Service 584ef9
/* Locks for cache LRU operations */
Packit Service 584ef9
pthread_mutex_t lru_locks[POWER_LARGEST];
Packit Service 584ef9
Packit Service 584ef9
/* Connection lock around accepting new connections */
Packit Service 584ef9
pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
Packit Service 584ef9
Packit Service 584ef9
#if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
Packit Service 584ef9
pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
Packit Service 584ef9
#endif
Packit Service 584ef9
Packit Service 584ef9
/* Lock for global stats */
Packit Service 584ef9
static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
Packit Service 584ef9
Packit Service 584ef9
/* Lock to cause worker threads to hang up after being woken */
Packit Service 584ef9
static pthread_mutex_t worker_hang_lock;
Packit Service 584ef9
Packit Service 584ef9
/* Free list of CQ_ITEM structs */
Packit Service 584ef9
static CQ_ITEM *cqi_freelist;
Packit Service 584ef9
static pthread_mutex_t cqi_freelist_lock;
Packit Service 584ef9
Packit Service 584ef9
static pthread_mutex_t *item_locks;
Packit Service 584ef9
/* size of the item lock hash table */
Packit Service 584ef9
static uint32_t item_lock_count;
Packit Service 584ef9
unsigned int item_lock_hashpower;
Packit Service 584ef9
#define hashsize(n) ((unsigned long int)1<<(n))
Packit Service 584ef9
#define hashmask(n) (hashsize(n)-1)
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Each libevent instance has a wakeup pipe, which other threads
Packit Service 584ef9
 * can use to signal that they've put a new connection on its queue.
Packit Service 584ef9
 */
Packit Service 584ef9
static LIBEVENT_THREAD *threads;
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Number of worker threads that have finished setting themselves up.
Packit Service 584ef9
 */
Packit Service 584ef9
static int init_count = 0;
Packit Service 584ef9
static pthread_mutex_t init_lock;
Packit Service 584ef9
static pthread_cond_t init_cond;
Packit Service 584ef9
Packit Service 584ef9
Packit Service 584ef9
static void thread_libevent_process(int fd, short which, void *arg);
Packit Service 584ef9
Packit Service 584ef9
/* item_lock() must be held for an item before any modifications to either its
Packit Service 584ef9
 * associated hash bucket, or the structure itself.
Packit Service 584ef9
 * LRU modifications must hold the item lock, and the LRU lock.
Packit Service 584ef9
 * LRU's accessing items must item_trylock() before modifying an item.
Packit Service 584ef9
 * Items accessible from an LRU must not be freed or modified
Packit Service 584ef9
 * without first locking and removing from the LRU.
Packit Service 584ef9
 */
Packit Service 584ef9
Packit Service 584ef9
void item_lock(uint32_t hv) {
Packit Service 584ef9
    mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
void *item_trylock(uint32_t hv) {
Packit Service 584ef9
    pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
Packit Service 584ef9
    if (pthread_mutex_trylock(lock) == 0) {
Packit Service 584ef9
        return lock;
Packit Service 584ef9
    }
Packit Service 584ef9
    return NULL;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
void item_trylock_unlock(void *lock) {
Packit Service 584ef9
    mutex_unlock((pthread_mutex_t *) lock);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
void item_unlock(uint32_t hv) {
Packit Service 584ef9
    mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
static void wait_for_thread_registration(int nthreads) {
Packit Service 584ef9
    while (init_count < nthreads) {
Packit Service 584ef9
        pthread_cond_wait(&init_cond, &init_lock);
Packit Service 584ef9
    }
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
static void register_thread_initialized(void) {
Packit Service 584ef9
    pthread_mutex_lock(&init_lock);
Packit Service 584ef9
    init_count++;
Packit Service 584ef9
    pthread_cond_signal(&init_cond);
Packit Service 584ef9
    pthread_mutex_unlock(&init_lock);
Packit Service 584ef9
    /* Force worker threads to pile up if someone wants us to */
Packit Service 584ef9
    pthread_mutex_lock(&worker_hang_lock);
Packit Service 584ef9
    pthread_mutex_unlock(&worker_hang_lock);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/* Must not be called with any deeper locks held */
Packit Service 584ef9
void pause_threads(enum pause_thread_types type) {
Packit Service 584ef9
    char buf[1];
Packit Service 584ef9
    int i;
Packit Service 584ef9
Packit Service 584ef9
    buf[0] = 0;
Packit Service 584ef9
    switch (type) {
Packit Service 584ef9
        case PAUSE_ALL_THREADS:
Packit Service 584ef9
            slabs_rebalancer_pause();
Packit Service 584ef9
            lru_maintainer_pause();
Packit Service 584ef9
            lru_crawler_pause();
Packit Service 584ef9
#ifdef EXTSTORE
Packit Service 584ef9
            storage_compact_pause();
Packit Service 584ef9
            storage_write_pause();
Packit Service 584ef9
#endif
Packit Service 584ef9
        case PAUSE_WORKER_THREADS:
Packit Service 584ef9
            buf[0] = 'p';
Packit Service 584ef9
            pthread_mutex_lock(&worker_hang_lock);
Packit Service 584ef9
            break;
Packit Service 584ef9
        case RESUME_ALL_THREADS:
Packit Service 584ef9
            slabs_rebalancer_resume();
Packit Service 584ef9
            lru_maintainer_resume();
Packit Service 584ef9
            lru_crawler_resume();
Packit Service 584ef9
#ifdef EXTSTORE
Packit Service 584ef9
            storage_compact_resume();
Packit Service 584ef9
            storage_write_resume();
Packit Service 584ef9
#endif
Packit Service 584ef9
        case RESUME_WORKER_THREADS:
Packit Service 584ef9
            pthread_mutex_unlock(&worker_hang_lock);
Packit Service 584ef9
            break;
Packit Service 584ef9
        default:
Packit Service 584ef9
            fprintf(stderr, "Unknown lock type: %d\n", type);
Packit Service 584ef9
            assert(1 == 0);
Packit Service 584ef9
            break;
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    /* Only send a message if we have one. */
Packit Service 584ef9
    if (buf[0] == 0) {
Packit Service 584ef9
        return;
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    pthread_mutex_lock(&init_lock);
Packit Service 584ef9
    init_count = 0;
Packit Service 584ef9
    for (i = 0; i < settings.num_threads; i++) {
Packit Service 584ef9
        if (write(threads[i].notify_send_fd, buf, 1) != 1) {
Packit Service 584ef9
            perror("Failed writing to notify pipe");
Packit Service 584ef9
            /* TODO: This is a fatal problem. Can it ever happen temporarily? */
Packit Service 584ef9
        }
Packit Service 584ef9
    }
Packit Service 584ef9
    wait_for_thread_registration(settings.num_threads);
Packit Service 584ef9
    pthread_mutex_unlock(&init_lock);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
// MUST not be called with any deeper locks held
Packit Service 584ef9
// MUST be called only by parent thread
Packit Service 584ef9
// Note: listener thread is the "main" event base, which has exited its
Packit Service 584ef9
// loop in order to call this function.
Packit Service 584ef9
void stop_threads(void) {
Packit Service 584ef9
    char buf[1];
Packit Service 584ef9
    int i;
Packit Service 584ef9
Packit Service 584ef9
    // assoc can call pause_threads(), so we have to stop it first.
Packit Service 584ef9
    stop_assoc_maintenance_thread();
Packit Service 584ef9
    if (settings.verbose > 0)
Packit Service 584ef9
        fprintf(stderr, "stopped assoc\n");
Packit Service 584ef9
Packit Service 584ef9
    if (settings.verbose > 0)
Packit Service 584ef9
        fprintf(stderr, "asking workers to stop\n");
Packit Service 584ef9
    buf[0] = 's';
Packit Service 535ea4
    pthread_mutex_lock(&worker_hang_lock);
Packit Service 584ef9
    pthread_mutex_lock(&init_lock);
Packit Service 584ef9
    init_count = 0;
Packit Service 584ef9
    for (i = 0; i < settings.num_threads; i++) {
Packit Service 584ef9
        if (write(threads[i].notify_send_fd, buf, 1) != 1) {
Packit Service 584ef9
            perror("Failed writing to notify pipe");
Packit Service 584ef9
            /* TODO: This is a fatal problem. Can it ever happen temporarily? */
Packit Service 584ef9
        }
Packit Service 584ef9
    }
Packit Service 584ef9
    wait_for_thread_registration(settings.num_threads);
Packit Service 584ef9
    pthread_mutex_unlock(&init_lock);
Packit Service 584ef9
Packit Service 535ea4
    // All of the workers are hung but haven't done cleanup yet.
Packit Service 535ea4
Packit Service 584ef9
    if (settings.verbose > 0)
Packit Service 584ef9
        fprintf(stderr, "asking background threads to stop\n");
Packit Service 584ef9
Packit Service 584ef9
    // stop each side thread.
Packit Service 584ef9
    // TODO: Verify these all work if the threads are already stopped
Packit Service 584ef9
    stop_item_crawler_thread(CRAWLER_WAIT);
Packit Service 584ef9
    if (settings.verbose > 0)
Packit Service 584ef9
        fprintf(stderr, "stopped lru crawler\n");
Packit Service a00917
    if (settings.lru_maintainer_thread) {
Packit Service a00917
        stop_lru_maintainer_thread();
Packit Service a00917
        if (settings.verbose > 0)
Packit Service a00917
            fprintf(stderr, "stopped maintainer\n");
Packit Service a00917
    }
Packit Service a00917
    if (settings.slab_reassign) {
Packit Service a00917
        stop_slab_maintenance_thread();
Packit Service a00917
        if (settings.verbose > 0)
Packit Service a00917
            fprintf(stderr, "stopped slab mover\n");
Packit Service a00917
    }
Packit Service 584ef9
    logger_stop();
Packit Service 584ef9
    if (settings.verbose > 0)
Packit Service 584ef9
        fprintf(stderr, "stopped logger thread\n");
Packit Service 584ef9
    stop_conn_timeout_thread();
Packit Service 584ef9
    if (settings.verbose > 0)
Packit Service 584ef9
        fprintf(stderr, "stopped idle timeout thread\n");
Packit Service 584ef9
Packit Service 535ea4
    // Close all connections then let the workers finally exit.
Packit Service 535ea4
    if (settings.verbose > 0)
Packit Service 535ea4
        fprintf(stderr, "closing connections\n");
Packit Service 535ea4
    conn_close_all();
Packit Service 535ea4
    pthread_mutex_unlock(&worker_hang_lock);
Packit Service 535ea4
    if (settings.verbose > 0)
Packit Service 535ea4
        fprintf(stderr, "reaping worker threads\n");
Packit Service 535ea4
    for (i = 0; i < settings.num_threads; i++) {
Packit Service 535ea4
        pthread_join(threads[i].thread_id, NULL);
Packit Service 535ea4
    }
Packit Service 535ea4
Packit Service 584ef9
    if (settings.verbose > 0)
Packit Service 584ef9
        fprintf(stderr, "all background threads stopped\n");
Packit Service 584ef9
Packit Service 584ef9
    // At this point, every background thread must be stopped.
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Initializes a connection queue.
Packit Service 584ef9
 */
Packit Service 584ef9
static void cq_init(CQ *cq) {
Packit Service 584ef9
    pthread_mutex_init(&cq->lock, NULL);
Packit Service 584ef9
    cq->head = NULL;
Packit Service 584ef9
    cq->tail = NULL;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Looks for an item on a connection queue, but doesn't block if there isn't
Packit Service 584ef9
 * one.
Packit Service 584ef9
 * Returns the item, or NULL if no item is available
Packit Service 584ef9
 */
Packit Service 584ef9
static CQ_ITEM *cq_pop(CQ *cq) {
Packit Service 584ef9
    CQ_ITEM *item;
Packit Service 584ef9
Packit Service 584ef9
    pthread_mutex_lock(&cq->lock);
Packit Service 584ef9
    item = cq->head;
Packit Service 584ef9
    if (NULL != item) {
Packit Service 584ef9
        cq->head = item->next;
Packit Service 584ef9
        if (NULL == cq->head)
Packit Service 584ef9
            cq->tail = NULL;
Packit Service 584ef9
    }
Packit Service 584ef9
    pthread_mutex_unlock(&cq->lock);
Packit Service 584ef9
Packit Service 584ef9
    return item;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Adds an item to a connection queue.
Packit Service 584ef9
 */
Packit Service 584ef9
static void cq_push(CQ *cq, CQ_ITEM *item) {
Packit Service 584ef9
    item->next = NULL;
Packit Service 584ef9
Packit Service 584ef9
    pthread_mutex_lock(&cq->lock);
Packit Service 584ef9
    if (NULL == cq->tail)
Packit Service 584ef9
        cq->head = item;
Packit Service 584ef9
    else
Packit Service 584ef9
        cq->tail->next = item;
Packit Service 584ef9
    cq->tail = item;
Packit Service 584ef9
    pthread_mutex_unlock(&cq->lock);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Returns a fresh connection queue item.
Packit Service 584ef9
 */
Packit Service 584ef9
static CQ_ITEM *cqi_new(void) {
Packit Service 584ef9
    CQ_ITEM *item = NULL;
Packit Service 584ef9
    pthread_mutex_lock(&cqi_freelist_lock);
Packit Service 584ef9
    if (cqi_freelist) {
Packit Service 584ef9
        item = cqi_freelist;
Packit Service 584ef9
        cqi_freelist = item->next;
Packit Service 584ef9
    }
Packit Service 584ef9
    pthread_mutex_unlock(&cqi_freelist_lock);
Packit Service 584ef9
Packit Service 584ef9
    if (NULL == item) {
Packit Service 584ef9
        int i;
Packit Service 584ef9
Packit Service 584ef9
        /* Allocate a bunch of items at once to reduce fragmentation */
Packit Service 584ef9
        item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
Packit Service 584ef9
        if (NULL == item) {
Packit Service 584ef9
            STATS_LOCK();
Packit Service 584ef9
            stats.malloc_fails++;
Packit Service 584ef9
            STATS_UNLOCK();
Packit Service 584ef9
            return NULL;
Packit Service 584ef9
        }
Packit Service 584ef9
Packit Service 584ef9
        /*
Packit Service 584ef9
         * Link together all the new items except the first one
Packit Service 584ef9
         * (which we'll return to the caller) for placement on
Packit Service 584ef9
         * the freelist.
Packit Service 584ef9
         */
Packit Service 584ef9
        for (i = 2; i < ITEMS_PER_ALLOC; i++)
Packit Service 584ef9
            item[i - 1].next = &item[i];
Packit Service 584ef9
Packit Service 584ef9
        pthread_mutex_lock(&cqi_freelist_lock);
Packit Service 584ef9
        item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
Packit Service 584ef9
        cqi_freelist = &item[1];
Packit Service 584ef9
        pthread_mutex_unlock(&cqi_freelist_lock);
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    return item;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Frees a connection queue item (adds it to the freelist.)
Packit Service 584ef9
 */
Packit Service 584ef9
static void cqi_free(CQ_ITEM *item) {
Packit Service 584ef9
    pthread_mutex_lock(&cqi_freelist_lock);
Packit Service 584ef9
    item->next = cqi_freelist;
Packit Service 584ef9
    cqi_freelist = item;
Packit Service 584ef9
    pthread_mutex_unlock(&cqi_freelist_lock);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Creates a worker thread.
Packit Service 584ef9
 */
Packit Service 584ef9
static void create_worker(void *(*func)(void *), void *arg) {
Packit Service 584ef9
    pthread_attr_t  attr;
Packit Service 584ef9
    int             ret;
Packit Service 584ef9
Packit Service 584ef9
    pthread_attr_init(&attr);
Packit Service 584ef9
Packit Service 584ef9
    if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
Packit Service 584ef9
        fprintf(stderr, "Can't create thread: %s\n",
Packit Service 584ef9
                strerror(ret));
Packit Service 584ef9
        exit(1);
Packit Service 584ef9
    }
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Sets whether or not we accept new connections.
Packit Service 584ef9
 */
Packit Service 584ef9
void accept_new_conns(const bool do_accept) {
Packit Service 584ef9
    pthread_mutex_lock(&conn_lock);
Packit Service 584ef9
    do_accept_new_conns(do_accept);
Packit Service 584ef9
    pthread_mutex_unlock(&conn_lock);
Packit Service 584ef9
}
Packit Service 584ef9
/****************************** LIBEVENT THREADS *****************************/
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Set up a thread's information.
Packit Service 584ef9
 */
Packit Service 584ef9
static void setup_thread(LIBEVENT_THREAD *me) {
Packit Service 584ef9
#if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
Packit Service 584ef9
    struct event_config *ev_config;
Packit Service 584ef9
    ev_config = event_config_new();
Packit Service 584ef9
    event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
Packit Service 584ef9
    me->base = event_base_new_with_config(ev_config);
Packit Service 584ef9
    event_config_free(ev_config);
Packit Service 584ef9
#else
Packit Service 584ef9
    me->base = event_init();
Packit Service 584ef9
#endif
Packit Service 584ef9
Packit Service 584ef9
    if (! me->base) {
Packit Service 584ef9
        fprintf(stderr, "Can't allocate event base\n");
Packit Service 584ef9
        exit(1);
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    /* Listen for notifications from other threads */
Packit Service 584ef9
    event_set(&me->notify_event, me->notify_receive_fd,
Packit Service 584ef9
              EV_READ | EV_PERSIST, thread_libevent_process, me);
Packit Service 584ef9
    event_base_set(me->base, &me->notify_event);
Packit Service 584ef9
Packit Service 584ef9
    if (event_add(&me->notify_event, 0) == -1) {
Packit Service 584ef9
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
Packit Service 584ef9
        exit(1);
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
Packit Service 584ef9
    if (me->new_conn_queue == NULL) {
Packit Service 584ef9
        perror("Failed to allocate memory for connection queue");
Packit Service 584ef9
        exit(EXIT_FAILURE);
Packit Service 584ef9
    }
Packit Service 584ef9
    cq_init(me->new_conn_queue);
Packit Service 584ef9
Packit Service 584ef9
    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
Packit Service 584ef9
        perror("Failed to initialize mutex");
Packit Service 584ef9
        exit(EXIT_FAILURE);
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
Packit Service 584ef9
                                    NULL, NULL);
Packit Service 584ef9
    if (me->suffix_cache == NULL) {
Packit Service 584ef9
        fprintf(stderr, "Failed to create suffix cache\n");
Packit Service 584ef9
        exit(EXIT_FAILURE);
Packit Service 584ef9
    }
Packit Service 584ef9
#ifdef EXTSTORE
Packit Service 584ef9
    me->io_cache = cache_create("io", sizeof(io_wrap), sizeof(char*), NULL, NULL);
Packit Service 584ef9
    if (me->io_cache == NULL) {
Packit Service 584ef9
        fprintf(stderr, "Failed to create IO object cache\n");
Packit Service 584ef9
        exit(EXIT_FAILURE);
Packit Service 584ef9
    }
Packit Service 584ef9
#endif
Packit Service 584ef9
#ifdef TLS
Packit Service 584ef9
    if (settings.ssl_enabled) {
Packit Service 584ef9
        me->ssl_wbuf = (char *)malloc((size_t)settings.ssl_wbuf_size);
Packit Service 584ef9
        if (me->ssl_wbuf == NULL) {
Packit Service 584ef9
            fprintf(stderr, "Failed to allocate the SSL write buffer\n");
Packit Service 584ef9
            exit(EXIT_FAILURE);
Packit Service 584ef9
        }
Packit Service 584ef9
    }
Packit Service 584ef9
#endif
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Worker thread: main event loop
Packit Service 584ef9
 */
Packit Service 584ef9
static void *worker_libevent(void *arg) {
Packit Service 584ef9
    LIBEVENT_THREAD *me = arg;
Packit Service 584ef9
Packit Service 584ef9
    /* Any per-thread setup can happen here; memcached_thread_init() will block until
Packit Service 584ef9
     * all threads have finished initializing.
Packit Service 584ef9
     */
Packit Service 584ef9
    me->l = logger_create();
Packit Service 584ef9
    me->lru_bump_buf = item_lru_bump_buf_create();
Packit Service 584ef9
    if (me->l == NULL || me->lru_bump_buf == NULL) {
Packit Service 584ef9
        abort();
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    if (settings.drop_privileges) {
Packit Service 584ef9
        drop_worker_privileges();
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    register_thread_initialized();
Packit Service 584ef9
Packit Service 584ef9
    event_base_loop(me->base, 0);
Packit Service 584ef9
Packit Service 584ef9
    // same mechanism used to watch for all threads exiting.
Packit Service 584ef9
    register_thread_initialized();
Packit Service 584ef9
Packit Service 584ef9
    event_base_free(me->base);
Packit Service 584ef9
    return NULL;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Processes an incoming "handle a new connection" item. This is called when
Packit Service 584ef9
 * input arrives on the libevent wakeup pipe.
Packit Service 584ef9
 */
Packit Service 584ef9
static void thread_libevent_process(int fd, short which, void *arg) {
Packit Service 584ef9
    LIBEVENT_THREAD *me = arg;
Packit Service 584ef9
    CQ_ITEM *item;
Packit Service 584ef9
    char buf[1];
Packit Service 584ef9
    conn *c;
Packit Service 584ef9
    unsigned int timeout_fd;
Packit Service 584ef9
Packit Service 584ef9
    if (read(fd, buf, 1) != 1) {
Packit Service 584ef9
        if (settings.verbose > 0)
Packit Service 584ef9
            fprintf(stderr, "Can't read from libevent pipe\n");
Packit Service 584ef9
        return;
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    switch (buf[0]) {
Packit Service 584ef9
    case 'c':
Packit Service 584ef9
        item = cq_pop(me->new_conn_queue);
Packit Service 584ef9
Packit Service 584ef9
        if (NULL == item) {
Packit Service 584ef9
            break;
Packit Service 584ef9
        }
Packit Service 584ef9
        switch (item->mode) {
Packit Service 584ef9
            case queue_new_conn:
Packit Service 584ef9
                c = conn_new(item->sfd, item->init_state, item->event_flags,
Packit Service 584ef9
                                   item->read_buffer_size, item->transport,
Packit Service 584ef9
                                   me->base, item->ssl);
Packit Service 584ef9
                if (c == NULL) {
Packit Service 584ef9
                    if (IS_UDP(item->transport)) {
Packit Service 584ef9
                        fprintf(stderr, "Can't listen for events on UDP socket\n");
Packit Service 584ef9
                        exit(1);
Packit Service 584ef9
                    } else {
Packit Service 584ef9
                        if (settings.verbose > 0) {
Packit Service 584ef9
                            fprintf(stderr, "Can't listen for events on fd %d\n",
Packit Service 584ef9
                                item->sfd);
Packit Service 584ef9
                        }
Packit Service 584ef9
#ifdef TLS
Packit Service 584ef9
                        if (item->ssl) {
Packit Service 584ef9
                            SSL_shutdown(item->ssl);
Packit Service 584ef9
                            SSL_free(item->ssl);
Packit Service 584ef9
                        }
Packit Service 584ef9
#endif
Packit Service 584ef9
                        close(item->sfd);
Packit Service 584ef9
                    }
Packit Service 584ef9
                } else {
Packit Service 584ef9
                    c->thread = me;
Packit Service 584ef9
#ifdef TLS
Packit Service 584ef9
                    if (settings.ssl_enabled && c->ssl != NULL) {
Packit Service 584ef9
                        assert(c->thread && c->thread->ssl_wbuf);
Packit Service 584ef9
                        c->ssl_wbuf = c->thread->ssl_wbuf;
Packit Service 584ef9
                    }
Packit Service 584ef9
#endif
Packit Service 584ef9
                }
Packit Service 584ef9
                break;
Packit Service 584ef9
Packit Service 584ef9
            case queue_redispatch:
Packit Service 584ef9
                conn_worker_readd(item->c);
Packit Service 584ef9
                break;
Packit Service 584ef9
        }
Packit Service 584ef9
        cqi_free(item);
Packit Service 584ef9
        break;
Packit Service 584ef9
    /* we were told to pause and report in */
Packit Service 584ef9
    case 'p':
Packit Service 584ef9
        register_thread_initialized();
Packit Service 584ef9
        break;
Packit Service 584ef9
    /* a client socket timed out */
Packit Service 584ef9
    case 't':
Packit Service 584ef9
        if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) {
Packit Service 584ef9
            if (settings.verbose > 0)
Packit Service 584ef9
                fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
Packit Service 584ef9
            return;
Packit Service 584ef9
        }
Packit Service 584ef9
        conn_close_idle(conns[timeout_fd]);
Packit Service 584ef9
        break;
Packit Service 584ef9
    /* asked to stop */
Packit Service 584ef9
    case 's':
Packit Service 584ef9
        event_base_loopexit(me->base, NULL);
Packit Service 584ef9
        break;
Packit Service 584ef9
    }
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/* Which thread we assigned a connection to most recently. */
Packit Service 584ef9
static int last_thread = -1;
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Dispatches a new connection to another thread. This is only ever called
Packit Service 584ef9
 * from the main thread, either during initialization (for UDP) or because
Packit Service 584ef9
 * of an incoming connection.
Packit Service 584ef9
 */
Packit Service 584ef9
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
Packit Service 584ef9
                       int read_buffer_size, enum network_transport transport, void *ssl) {
Packit Service 584ef9
    CQ_ITEM *item = cqi_new();
Packit Service 584ef9
    char buf[1];
Packit Service 584ef9
    if (item == NULL) {
Packit Service 584ef9
        close(sfd);
Packit Service 584ef9
        /* given that malloc failed this may also fail, but let's try */
Packit Service 584ef9
        fprintf(stderr, "Failed to allocate memory for connection object\n");
Packit Service 584ef9
        return;
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    int tid = (last_thread + 1) % settings.num_threads;
Packit Service 584ef9
Packit Service 584ef9
    LIBEVENT_THREAD *thread = threads + tid;
Packit Service 584ef9
Packit Service 584ef9
    last_thread = tid;
Packit Service 584ef9
Packit Service 584ef9
    item->sfd = sfd;
Packit Service 584ef9
    item->init_state = init_state;
Packit Service 584ef9
    item->event_flags = event_flags;
Packit Service 584ef9
    item->read_buffer_size = read_buffer_size;
Packit Service 584ef9
    item->transport = transport;
Packit Service 584ef9
    item->mode = queue_new_conn;
Packit Service 584ef9
    item->ssl = ssl;
Packit Service 584ef9
Packit Service 584ef9
    cq_push(thread->new_conn_queue, item);
Packit Service 584ef9
Packit Service 584ef9
    MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
Packit Service 584ef9
    buf[0] = 'c';
Packit Service 584ef9
    if (write(thread->notify_send_fd, buf, 1) != 1) {
Packit Service 584ef9
        perror("Writing to thread notify pipe");
Packit Service 584ef9
    }
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Re-dispatches a connection back to the original thread. Can be called from
Packit Service 584ef9
 * any side thread borrowing a connection.
Packit Service 584ef9
 */
Packit Service 584ef9
void redispatch_conn(conn *c) {
Packit Service 584ef9
    CQ_ITEM *item = cqi_new();
Packit Service 584ef9
    char buf[1];
Packit Service 584ef9
    if (item == NULL) {
Packit Service 584ef9
        /* Can't cleanly redispatch connection. close it forcefully. */
Packit Service 584ef9
        c->state = conn_closed;
Packit Service 584ef9
        close(c->sfd);
Packit Service 584ef9
        return;
Packit Service 584ef9
    }
Packit Service 584ef9
    LIBEVENT_THREAD *thread = c->thread;
Packit Service 584ef9
    item->sfd = c->sfd;
Packit Service 584ef9
    item->init_state = conn_new_cmd;
Packit Service 584ef9
    item->c = c;
Packit Service 584ef9
    item->mode = queue_redispatch;
Packit Service 584ef9
Packit Service 584ef9
    cq_push(thread->new_conn_queue, item);
Packit Service 584ef9
Packit Service 584ef9
    buf[0] = 'c';
Packit Service 584ef9
    if (write(thread->notify_send_fd, buf, 1) != 1) {
Packit Service 584ef9
        perror("Writing to thread notify pipe");
Packit Service 584ef9
    }
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/* This misses the allow_new_conns flag :( */
Packit Service 584ef9
void sidethread_conn_close(conn *c) {
Packit Service 584ef9
    c->state = conn_closed;
Packit Service 584ef9
    if (settings.verbose > 1)
Packit Service 584ef9
        fprintf(stderr, "<%d connection closed from side thread.\n", c->sfd);
Packit Service 584ef9
#ifdef TLS
Packit Service 584ef9
    if (c->ssl) {
Packit Service 584ef9
        c->ssl_wbuf = NULL;
Packit Service 584ef9
        SSL_shutdown(c->ssl);
Packit Service 584ef9
        SSL_free(c->ssl);
Packit Service 584ef9
    }
Packit Service 584ef9
#endif
Packit Service 584ef9
    close(c->sfd);
Packit Service 584ef9
Packit Service 584ef9
    STATS_LOCK();
Packit Service 584ef9
    stats_state.curr_conns--;
Packit Service 584ef9
    STATS_UNLOCK();
Packit Service 584ef9
Packit Service 584ef9
    return;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/********************************* ITEM ACCESS *******************************/
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Allocates a new item.
Packit Service 584ef9
 */
Packit Service 584ef9
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
Packit Service 584ef9
    item *it;
Packit Service 584ef9
    /* do_item_alloc handles its own locks */
Packit Service 584ef9
    it = do_item_alloc(key, nkey, flags, exptime, nbytes);
Packit Service 584ef9
    return it;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Returns an item if it hasn't been marked as expired,
Packit Service 584ef9
 * lazy-expiring as needed.
Packit Service 584ef9
 */
Packit Service 584ef9
item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) {
Packit Service 584ef9
    item *it;
Packit Service 584ef9
    uint32_t hv;
Packit Service 584ef9
    hv = hash(key, nkey);
Packit Service 584ef9
    item_lock(hv);
Packit Service 584ef9
    it = do_item_get(key, nkey, hv, c, do_update);
Packit Service 584ef9
    item_unlock(hv);
Packit Service 584ef9
    return it;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
// returns an item with the item lock held.
Packit Service 584ef9
// lock will still be held even if return is NULL, allowing caller to replace
Packit Service 584ef9
// an item atomically if desired.
Packit Service 584ef9
item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv) {
Packit Service 584ef9
    item *it;
Packit Service 584ef9
    *hv = hash(key, nkey);
Packit Service 584ef9
    item_lock(*hv);
Packit Service 584ef9
    it = do_item_get(key, nkey, *hv, c, do_update);
Packit Service 584ef9
    return it;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) {
Packit Service 584ef9
    item *it;
Packit Service 584ef9
    uint32_t hv;
Packit Service 584ef9
    hv = hash(key, nkey);
Packit Service 584ef9
    item_lock(hv);
Packit Service 584ef9
    it = do_item_touch(key, nkey, exptime, hv, c);
Packit Service 584ef9
    item_unlock(hv);
Packit Service 584ef9
    return it;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Links an item into the LRU and hashtable.
Packit Service 584ef9
 */
Packit Service 584ef9
int item_link(item *item) {
Packit Service 584ef9
    int ret;
Packit Service 584ef9
    uint32_t hv;
Packit Service 584ef9
Packit Service 584ef9
    hv = hash(ITEM_key(item), item->nkey);
Packit Service 584ef9
    item_lock(hv);
Packit Service 584ef9
    ret = do_item_link(item, hv);
Packit Service 584ef9
    item_unlock(hv);
Packit Service 584ef9
    return ret;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Decrements the reference count on an item and adds it to the freelist if
Packit Service 584ef9
 * needed.
Packit Service 584ef9
 */
Packit Service 584ef9
void item_remove(item *item) {
Packit Service 584ef9
    uint32_t hv;
Packit Service 584ef9
    hv = hash(ITEM_key(item), item->nkey);
Packit Service 584ef9
Packit Service 584ef9
    item_lock(hv);
Packit Service 584ef9
    do_item_remove(item);
Packit Service 584ef9
    item_unlock(hv);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Replaces one item with another in the hashtable.
Packit Service 584ef9
 * Unprotected by a mutex lock since the core server does not require
Packit Service 584ef9
 * it to be thread-safe.
Packit Service 584ef9
 */
Packit Service 584ef9
int item_replace(item *old_it, item *new_it, const uint32_t hv) {
Packit Service 584ef9
    return do_item_replace(old_it, new_it, hv);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Unlinks an item from the LRU and hashtable.
Packit Service 584ef9
 */
Packit Service 584ef9
void item_unlink(item *item) {
Packit Service 584ef9
    uint32_t hv;
Packit Service 584ef9
    hv = hash(ITEM_key(item), item->nkey);
Packit Service 584ef9
    item_lock(hv);
Packit Service 584ef9
    do_item_unlink(item, hv);
Packit Service 584ef9
    item_unlock(hv);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Does arithmetic on a numeric item value.
Packit Service 584ef9
 */
Packit Service 584ef9
enum delta_result_type add_delta(conn *c, const char *key,
Packit Service 584ef9
                                 const size_t nkey, bool incr,
Packit Service 584ef9
                                 const int64_t delta, char *buf,
Packit Service 584ef9
                                 uint64_t *cas) {
Packit Service 584ef9
    enum delta_result_type ret;
Packit Service 584ef9
    uint32_t hv;
Packit Service 584ef9
Packit Service 584ef9
    hv = hash(key, nkey);
Packit Service 584ef9
    item_lock(hv);
Packit Service 584ef9
    ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
Packit Service 584ef9
    item_unlock(hv);
Packit Service 584ef9
    return ret;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Stores an item in the cache (high level, obeys set/add/replace semantics)
Packit Service 584ef9
 */
Packit Service 584ef9
enum store_item_type store_item(item *item, int comm, conn* c) {
Packit Service 584ef9
    enum store_item_type ret;
Packit Service 584ef9
    uint32_t hv;
Packit Service 584ef9
Packit Service 584ef9
    hv = hash(ITEM_key(item), item->nkey);
Packit Service 584ef9
    item_lock(hv);
Packit Service 584ef9
    ret = do_store_item(item, comm, c, hv);
Packit Service 584ef9
    item_unlock(hv);
Packit Service 584ef9
    return ret;
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/******************************* GLOBAL STATS ******************************/
Packit Service 584ef9
Packit Service 584ef9
void STATS_LOCK() {
Packit Service 584ef9
    pthread_mutex_lock(&stats_lock);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
void STATS_UNLOCK() {
Packit Service 584ef9
    pthread_mutex_unlock(&stats_lock);
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
void threadlocal_stats_reset(void) {
Packit Service 584ef9
    int ii;
Packit Service 584ef9
    for (ii = 0; ii < settings.num_threads; ++ii) {
Packit Service 584ef9
        pthread_mutex_lock(&threads[ii].stats.mutex);
Packit Service 584ef9
#define X(name) threads[ii].stats.name = 0;
Packit Service 584ef9
        THREAD_STATS_FIELDS
Packit Service 584ef9
#ifdef EXTSTORE
Packit Service 584ef9
        EXTSTORE_THREAD_STATS_FIELDS
Packit Service 584ef9
#endif
Packit Service 584ef9
#undef X
Packit Service 584ef9
Packit Service 584ef9
        memset(&threads[ii].stats.slab_stats, 0,
Packit Service 584ef9
                sizeof(threads[ii].stats.slab_stats));
Packit Service 584ef9
        memset(&threads[ii].stats.lru_hits, 0,
Packit Service 584ef9
                sizeof(uint64_t) * POWER_LARGEST);
Packit Service 584ef9
Packit Service 584ef9
        pthread_mutex_unlock(&threads[ii].stats.mutex);
Packit Service 584ef9
    }
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
void threadlocal_stats_aggregate(struct thread_stats *stats) {
Packit Service 584ef9
    int ii, sid;
Packit Service 584ef9
Packit Service 584ef9
    /* The struct has a mutex, but we can safely set the whole thing
Packit Service 584ef9
     * to zero since it is unused when aggregating. */
Packit Service 584ef9
    memset(stats, 0, sizeof(*stats));
Packit Service 584ef9
Packit Service 584ef9
    for (ii = 0; ii < settings.num_threads; ++ii) {
Packit Service 584ef9
        pthread_mutex_lock(&threads[ii].stats.mutex);
Packit Service 584ef9
#define X(name) stats->name += threads[ii].stats.name;
Packit Service 584ef9
        THREAD_STATS_FIELDS
Packit Service 584ef9
#ifdef EXTSTORE
Packit Service 584ef9
        EXTSTORE_THREAD_STATS_FIELDS
Packit Service 584ef9
#endif
Packit Service 584ef9
#undef X
Packit Service 584ef9
Packit Service 584ef9
        for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
Packit Service 584ef9
#define X(name) stats->slab_stats[sid].name += \
Packit Service 584ef9
            threads[ii].stats.slab_stats[sid].name;
Packit Service 584ef9
            SLAB_STATS_FIELDS
Packit Service 584ef9
#undef X
Packit Service 584ef9
        }
Packit Service 584ef9
Packit Service 584ef9
        for (sid = 0; sid < POWER_LARGEST; sid++) {
Packit Service 584ef9
            stats->lru_hits[sid] +=
Packit Service 584ef9
                threads[ii].stats.lru_hits[sid];
Packit Service 584ef9
            stats->slab_stats[CLEAR_LRU(sid)].get_hits +=
Packit Service 584ef9
                threads[ii].stats.lru_hits[sid];
Packit Service 584ef9
        }
Packit Service 584ef9
Packit Service 584ef9
        pthread_mutex_unlock(&threads[ii].stats.mutex);
Packit Service 584ef9
    }
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
Packit Service 584ef9
    int sid;
Packit Service 584ef9
Packit Service 584ef9
    memset(out, 0, sizeof(*out));
Packit Service 584ef9
Packit Service 584ef9
    for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
Packit Service 584ef9
#define X(name) out->name += stats->slab_stats[sid].name;
Packit Service 584ef9
        SLAB_STATS_FIELDS
Packit Service 584ef9
#undef X
Packit Service 584ef9
    }
Packit Service 584ef9
}
Packit Service 584ef9
Packit Service 584ef9
/*
Packit Service 584ef9
 * Initializes the thread subsystem, creating various worker threads.
Packit Service 584ef9
 *
Packit Service 584ef9
 * nthreads  Number of worker event handler threads to spawn
Packit Service 584ef9
 */
Packit Service 584ef9
void memcached_thread_init(int nthreads, void *arg) {
Packit Service 584ef9
    int         i;
Packit Service 584ef9
    int         power;
Packit Service 584ef9
Packit Service 584ef9
    for (i = 0; i < POWER_LARGEST; i++) {
Packit Service 584ef9
        pthread_mutex_init(&lru_locks[i], NULL);
Packit Service 584ef9
    }
Packit Service 584ef9
    pthread_mutex_init(&worker_hang_lock, NULL);
Packit Service 584ef9
Packit Service 584ef9
    pthread_mutex_init(&init_lock, NULL);
Packit Service 584ef9
    pthread_cond_init(&init_cond, NULL);
Packit Service 584ef9
Packit Service 584ef9
    pthread_mutex_init(&cqi_freelist_lock, NULL);
Packit Service 584ef9
    cqi_freelist = NULL;
Packit Service 584ef9
Packit Service 584ef9
    /* Want a wide lock table, but don't waste memory */
Packit Service 584ef9
    if (nthreads < 3) {
Packit Service 584ef9
        power = 10;
Packit Service 584ef9
    } else if (nthreads < 4) {
Packit Service 584ef9
        power = 11;
Packit Service 584ef9
    } else if (nthreads < 5) {
Packit Service 584ef9
        power = 12;
Packit Service 584ef9
    } else if (nthreads <= 10) {
Packit Service 584ef9
        power = 13;
Packit Service 584ef9
    } else if (nthreads <= 20) {
Packit Service 584ef9
        power = 14;
Packit Service 584ef9
    } else {
Packit Service 584ef9
        /* 32k buckets. just under the hashpower default. */
Packit Service 584ef9
        power = 15;
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    if (power >= hashpower) {
Packit Service 584ef9
        fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
Packit Service 584ef9
        fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
Packit Service 584ef9
        fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
Packit Service 584ef9
        exit(1);
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    item_lock_count = hashsize(power);
Packit Service 584ef9
    item_lock_hashpower = power;
Packit Service 584ef9
Packit Service 584ef9
    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
Packit Service 584ef9
    if (! item_locks) {
Packit Service 584ef9
        perror("Can't allocate item locks");
Packit Service 584ef9
        exit(1);
Packit Service 584ef9
    }
Packit Service 584ef9
    for (i = 0; i < item_lock_count; i++) {
Packit Service 584ef9
        pthread_mutex_init(&item_locks[i], NULL);
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
Packit Service 584ef9
    if (! threads) {
Packit Service 584ef9
        perror("Can't allocate thread descriptors");
Packit Service 584ef9
        exit(1);
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    for (i = 0; i < nthreads; i++) {
Packit Service 584ef9
        int fds[2];
Packit Service 584ef9
        if (pipe(fds)) {
Packit Service 584ef9
            perror("Can't create notify pipe");
Packit Service 584ef9
            exit(1);
Packit Service 584ef9
        }
Packit Service 584ef9
Packit Service 584ef9
        threads[i].notify_receive_fd = fds[0];
Packit Service 584ef9
        threads[i].notify_send_fd = fds[1];
Packit Service 584ef9
#ifdef EXTSTORE
Packit Service 584ef9
        threads[i].storage = arg;
Packit Service 584ef9
#endif
Packit Service 584ef9
        setup_thread(&threads[i]);
Packit Service 584ef9
        /* Reserve three fds for the libevent base, and two for the pipe */
Packit Service 584ef9
        stats_state.reserved_fds += 5;
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    /* Create threads after we've done all the libevent setup. */
Packit Service 584ef9
    for (i = 0; i < nthreads; i++) {
Packit Service 584ef9
        create_worker(worker_libevent, &threads[i]);
Packit Service 584ef9
    }
Packit Service 584ef9
Packit Service 584ef9
    /* Wait for all the threads to set themselves up before returning. */
Packit Service 584ef9
    pthread_mutex_lock(&init_lock);
Packit Service 584ef9
    wait_for_thread_registration(nthreads);
Packit Service 584ef9
    pthread_mutex_unlock(&init_lock);
Packit Service 584ef9
}
Packit Service 584ef9