|
Packit |
4e8bc4 |
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// FIXME: config.h?
|
|
Packit |
4e8bc4 |
#include <stdint.h>
|
|
Packit |
4e8bc4 |
#include <stdbool.h>
|
|
Packit |
4e8bc4 |
// end FIXME
|
|
Packit |
4e8bc4 |
#include <stdlib.h>
|
|
Packit |
4e8bc4 |
#include <limits.h>
|
|
Packit |
4e8bc4 |
#include <pthread.h>
|
|
Packit |
4e8bc4 |
#include <sys/types.h>
|
|
Packit |
4e8bc4 |
#include <sys/stat.h>
|
|
Packit |
4e8bc4 |
#include <sys/uio.h>
|
|
Packit |
4e8bc4 |
#include <fcntl.h>
|
|
Packit |
4e8bc4 |
#include <unistd.h>
|
|
Packit |
4e8bc4 |
#include <stdio.h>
|
|
Packit |
4e8bc4 |
#include <string.h>
|
|
Packit |
4e8bc4 |
#include <assert.h>
|
|
Packit |
4e8bc4 |
#include "extstore.h"
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// TODO: better if an init option turns this on/off.
|
|
Packit |
4e8bc4 |
#ifdef EXTSTORE_DEBUG
|
|
Packit |
4e8bc4 |
#define E_DEBUG(...) \
|
|
Packit |
4e8bc4 |
do { \
|
|
Packit |
4e8bc4 |
fprintf(stderr, __VA_ARGS__); \
|
|
Packit |
4e8bc4 |
} while (0)
|
|
Packit |
4e8bc4 |
#else
|
|
Packit |
4e8bc4 |
#define E_DEBUG(...)
|
|
Packit |
4e8bc4 |
#endif
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
#define STAT_L(e) pthread_mutex_lock(&e->stats_mutex);
|
|
Packit |
4e8bc4 |
#define STAT_UL(e) pthread_mutex_unlock(&e->stats_mutex);
|
|
Packit |
4e8bc4 |
#define STAT_INCR(e, stat, amount) { \
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->stats_mutex); \
|
|
Packit |
4e8bc4 |
e->stats.stat += amount; \
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->stats_mutex); \
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
#define STAT_DECR(e, stat, amount) { \
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->stats_mutex); \
|
|
Packit |
4e8bc4 |
e->stats.stat -= amount; \
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->stats_mutex); \
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
typedef struct __store_wbuf {
|
|
Packit |
4e8bc4 |
struct __store_wbuf *next;
|
|
Packit |
4e8bc4 |
char *buf;
|
|
Packit |
4e8bc4 |
char *buf_pos;
|
|
Packit |
4e8bc4 |
unsigned int free;
|
|
Packit |
4e8bc4 |
unsigned int size;
|
|
Packit |
4e8bc4 |
unsigned int offset; /* offset into page this write starts at */
|
|
Packit |
4e8bc4 |
bool full; /* done writing to this page */
|
|
Packit |
4e8bc4 |
bool flushed; /* whether wbuf has been flushed to disk */
|
|
Packit |
4e8bc4 |
} _store_wbuf;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
typedef struct _store_page {
|
|
Packit |
4e8bc4 |
pthread_mutex_t mutex; /* Need to be held for most operations */
|
|
Packit |
4e8bc4 |
uint64_t obj_count; /* _delete can decrease post-closing */
|
|
Packit |
4e8bc4 |
uint64_t bytes_used; /* _delete can decrease post-closing */
|
|
Packit |
4e8bc4 |
uint64_t offset; /* starting address of page within fd */
|
|
Packit |
4e8bc4 |
unsigned int version;
|
|
Packit |
4e8bc4 |
unsigned int refcount;
|
|
Packit |
4e8bc4 |
unsigned int allocated;
|
|
Packit |
4e8bc4 |
unsigned int written; /* item offsets can be past written if wbuf not flushed */
|
|
Packit |
4e8bc4 |
unsigned int bucket; /* which bucket the page is linked into */
|
|
Packit |
4e8bc4 |
unsigned int free_bucket; /* which bucket this page returns to when freed */
|
|
Packit |
4e8bc4 |
int fd;
|
|
Packit |
4e8bc4 |
unsigned short id;
|
|
Packit |
4e8bc4 |
bool active; /* actively being written to */
|
|
Packit |
4e8bc4 |
bool closed; /* closed and draining before free */
|
|
Packit |
4e8bc4 |
bool free; /* on freelist */
|
|
Packit |
4e8bc4 |
_store_wbuf *wbuf; /* currently active wbuf from the stack */
|
|
Packit |
4e8bc4 |
struct _store_page *next;
|
|
Packit |
4e8bc4 |
} store_page;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
typedef struct store_engine store_engine;
|
|
Packit |
4e8bc4 |
typedef struct {
|
|
Packit |
4e8bc4 |
pthread_mutex_t mutex;
|
|
Packit |
4e8bc4 |
pthread_cond_t cond;
|
|
Packit |
4e8bc4 |
obj_io *queue;
|
|
Packit |
4e8bc4 |
store_engine *e;
|
|
Packit |
4e8bc4 |
unsigned int depth; // queue depth
|
|
Packit |
4e8bc4 |
} store_io_thread;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
typedef struct {
|
|
Packit |
4e8bc4 |
pthread_mutex_t mutex;
|
|
Packit |
4e8bc4 |
pthread_cond_t cond;
|
|
Packit |
4e8bc4 |
store_engine *e;
|
|
Packit |
4e8bc4 |
} store_maint_thread;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
struct store_engine {
|
|
Packit |
4e8bc4 |
pthread_mutex_t mutex; /* covers internal stacks and variables */
|
|
Packit |
4e8bc4 |
store_page *pages; /* directly addressable page list */
|
|
Packit |
4e8bc4 |
_store_wbuf *wbuf_stack; /* wbuf freelist */
|
|
Packit |
4e8bc4 |
obj_io *io_stack; /* IO's to use with submitting wbuf's */
|
|
Packit |
4e8bc4 |
store_io_thread *io_threads;
|
|
Packit |
4e8bc4 |
store_maint_thread *maint_thread;
|
|
Packit |
4e8bc4 |
store_page *page_freelist;
|
|
Packit |
4e8bc4 |
store_page **page_buckets; /* stack of pages currently allocated to each bucket */
|
|
Packit |
4e8bc4 |
store_page **free_page_buckets; /* stack of use-case isolated free pages */
|
|
Packit |
4e8bc4 |
size_t page_size;
|
|
Packit |
4e8bc4 |
unsigned int version; /* global version counter */
|
|
Packit |
4e8bc4 |
unsigned int last_io_thread; /* round robin the IO threads */
|
|
Packit |
4e8bc4 |
unsigned int io_threadcount; /* count of IO threads */
|
|
Packit |
4e8bc4 |
unsigned int page_count;
|
|
Packit |
4e8bc4 |
unsigned int page_free; /* unallocated pages */
|
|
Packit |
4e8bc4 |
unsigned int page_bucketcount; /* count of potential page buckets */
|
|
Packit |
4e8bc4 |
unsigned int free_page_bucketcount; /* count of free page buckets */
|
|
Packit |
4e8bc4 |
unsigned int io_depth; /* FIXME: Might cache into thr struct */
|
|
Packit |
4e8bc4 |
pthread_mutex_t stats_mutex;
|
|
Packit |
4e8bc4 |
struct extstore_stats stats;
|
|
Packit |
4e8bc4 |
};
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
static _store_wbuf *wbuf_new(size_t size) {
|
|
Packit |
4e8bc4 |
_store_wbuf *b = calloc(1, sizeof(_store_wbuf));
|
|
Packit |
4e8bc4 |
if (b == NULL)
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
b->buf = malloc(size);
|
|
Packit |
4e8bc4 |
if (b->buf == NULL) {
|
|
Packit |
4e8bc4 |
free(b);
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
b->buf_pos = b->buf;
|
|
Packit |
4e8bc4 |
b->free = size;
|
|
Packit |
4e8bc4 |
b->size = size;
|
|
Packit |
4e8bc4 |
return b;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
static store_io_thread *_get_io_thread(store_engine *e) {
|
|
Packit |
4e8bc4 |
int tid = -1;
|
|
Packit |
4e8bc4 |
long long int low = LLONG_MAX;
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->mutex);
|
|
Packit |
4e8bc4 |
// find smallest queue. ignoring lock since being wrong isn't fatal.
|
|
Packit |
4e8bc4 |
// TODO: if average queue depth can be quickly tracked, can break as soon
|
|
Packit |
4e8bc4 |
// as we see a thread that's less than average, and start from last_io_thread
|
|
Packit |
4e8bc4 |
for (int x = 0; x < e->io_threadcount; x++) {
|
|
Packit |
4e8bc4 |
if (e->io_threads[x].depth == 0) {
|
|
Packit |
4e8bc4 |
tid = x;
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
} else if (e->io_threads[x].depth < low) {
|
|
Packit |
4e8bc4 |
tid = x;
|
|
Packit |
4e8bc4 |
low = e->io_threads[x].depth;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->mutex);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
return &e->io_threads[tid];
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
static uint64_t _next_version(store_engine *e) {
|
|
Packit |
4e8bc4 |
return e->version++;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
static void *extstore_io_thread(void *arg);
|
|
Packit |
4e8bc4 |
static void *extstore_maint_thread(void *arg);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* Copies stats internal to engine and computes any derived values */
|
|
Packit |
4e8bc4 |
void extstore_get_stats(void *ptr, struct extstore_stats *st) {
|
|
Packit |
4e8bc4 |
store_engine *e = (store_engine *)ptr;
|
|
Packit |
4e8bc4 |
STAT_L(e);
|
|
Packit |
4e8bc4 |
memcpy(st, &e->stats, sizeof(struct extstore_stats));
|
|
Packit |
4e8bc4 |
STAT_UL(e);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// grab pages_free/pages_used
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->mutex);
|
|
Packit |
4e8bc4 |
st->pages_free = e->page_free;
|
|
Packit |
4e8bc4 |
st->pages_used = e->page_count - e->page_free;
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->mutex);
|
|
Packit |
4e8bc4 |
st->io_queue = 0;
|
|
Packit |
4e8bc4 |
for (int x = 0; x < e->io_threadcount; x++) {
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->io_threads[x].mutex);
|
|
Packit |
4e8bc4 |
st->io_queue += e->io_threads[x].depth;
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->io_threads[x].mutex);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
// calculate bytes_fragmented.
|
|
Packit |
4e8bc4 |
// note that open and yet-filled pages count against fragmentation.
|
|
Packit |
4e8bc4 |
st->bytes_fragmented = st->pages_used * e->page_size -
|
|
Packit |
4e8bc4 |
st->bytes_used;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
void extstore_get_page_data(void *ptr, struct extstore_stats *st) {
|
|
Packit |
4e8bc4 |
store_engine *e = (store_engine *)ptr;
|
|
Packit |
4e8bc4 |
STAT_L(e);
|
|
Packit |
4e8bc4 |
memcpy(st->page_data, e->stats.page_data,
|
|
Packit |
4e8bc4 |
sizeof(struct extstore_page_data) * e->page_count);
|
|
Packit |
4e8bc4 |
STAT_UL(e);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
const char *extstore_err(enum extstore_res res) {
|
|
Packit |
4e8bc4 |
const char *rv = "unknown error";
|
|
Packit |
4e8bc4 |
switch (res) {
|
|
Packit |
4e8bc4 |
case EXTSTORE_INIT_BAD_WBUF_SIZE:
|
|
Packit |
4e8bc4 |
rv = "page_size must be divisible by wbuf_size";
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
case EXTSTORE_INIT_NEED_MORE_WBUF:
|
|
Packit |
4e8bc4 |
rv = "wbuf_count must be >= page_buckets";
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
case EXTSTORE_INIT_NEED_MORE_BUCKETS:
|
|
Packit |
4e8bc4 |
rv = "page_buckets must be > 0";
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
case EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT:
|
|
Packit |
4e8bc4 |
rv = "page_size and wbuf_size must be divisible by 1024*1024*2";
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
case EXTSTORE_INIT_TOO_MANY_PAGES:
|
|
Packit |
4e8bc4 |
rv = "page_count must total to < 65536. Increase page_size or lower path sizes";
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
case EXTSTORE_INIT_OOM:
|
|
Packit |
4e8bc4 |
rv = "failed calloc for engine";
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
case EXTSTORE_INIT_OPEN_FAIL:
|
|
Packit |
4e8bc4 |
rv = "failed to open file";
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
case EXTSTORE_INIT_THREAD_FAIL:
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
return rv;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// TODO: #define's for DEFAULT_BUCKET, FREE_VERSION, etc
|
|
Packit |
4e8bc4 |
void *extstore_init(struct extstore_conf_file *fh, struct extstore_conf *cf,
|
|
Packit |
4e8bc4 |
enum extstore_res *res) {
|
|
Packit |
4e8bc4 |
int i;
|
|
Packit |
4e8bc4 |
struct extstore_conf_file *f = NULL;
|
|
Packit |
4e8bc4 |
pthread_t thread;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
if (cf->page_size % cf->wbuf_size != 0) {
|
|
Packit |
4e8bc4 |
*res = EXTSTORE_INIT_BAD_WBUF_SIZE;
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
// Should ensure at least one write buffer per potential page
|
|
Packit |
4e8bc4 |
if (cf->page_buckets > cf->wbuf_count) {
|
|
Packit |
4e8bc4 |
*res = EXTSTORE_INIT_NEED_MORE_WBUF;
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
if (cf->page_buckets < 1) {
|
|
Packit |
4e8bc4 |
*res = EXTSTORE_INIT_NEED_MORE_BUCKETS;
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// TODO: More intelligence around alignment of flash erasure block sizes
|
|
Packit |
4e8bc4 |
if (cf->page_size % (1024 * 1024 * 2) != 0 ||
|
|
Packit |
4e8bc4 |
cf->wbuf_size % (1024 * 1024 * 2) != 0) {
|
|
Packit |
4e8bc4 |
*res = EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT;
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
store_engine *e = calloc(1, sizeof(store_engine));
|
|
Packit |
4e8bc4 |
if (e == NULL) {
|
|
Packit |
4e8bc4 |
*res = EXTSTORE_INIT_OOM;
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
e->page_size = cf->page_size;
|
|
Packit |
4e8bc4 |
uint64_t temp_page_count = 0;
|
|
Packit |
4e8bc4 |
for (f = fh; f != NULL; f = f->next) {
|
|
Packit |
4e8bc4 |
f->fd = open(f->file, O_RDWR | O_CREAT | O_TRUNC, 0644);
|
|
Packit |
4e8bc4 |
if (f->fd < 0) {
|
|
Packit |
4e8bc4 |
*res = EXTSTORE_INIT_OPEN_FAIL;
|
|
Packit |
4e8bc4 |
#ifdef EXTSTORE_DEBUG
|
|
Packit |
4e8bc4 |
perror("open");
|
|
Packit |
4e8bc4 |
#endif
|
|
Packit |
4e8bc4 |
free(e);
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
temp_page_count += f->page_count;
|
|
Packit |
4e8bc4 |
f->offset = 0;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
if (temp_page_count >= UINT16_MAX) {
|
|
Packit |
4e8bc4 |
*res = EXTSTORE_INIT_TOO_MANY_PAGES;
|
|
Packit |
4e8bc4 |
free(e);
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
e->page_count = temp_page_count;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
e->pages = calloc(e->page_count, sizeof(store_page));
|
|
Packit |
4e8bc4 |
if (e->pages == NULL) {
|
|
Packit |
4e8bc4 |
*res = EXTSTORE_INIT_OOM;
|
|
Packit |
4e8bc4 |
// FIXME: loop-close. make error label
|
|
Packit |
4e8bc4 |
free(e);
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// interleave the pages between devices
|
|
Packit |
4e8bc4 |
f = NULL; // start at the first device.
|
|
Packit |
4e8bc4 |
for (i = 0; i < e->page_count; i++) {
|
|
Packit |
4e8bc4 |
// find next device with available pages
|
|
Packit |
4e8bc4 |
while (1) {
|
|
Packit |
4e8bc4 |
// restart the loop
|
|
Packit |
4e8bc4 |
if (f == NULL || f->next == NULL) {
|
|
Packit |
4e8bc4 |
f = fh;
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
f = f->next;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
if (f->page_count) {
|
|
Packit |
4e8bc4 |
f->page_count--;
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_init(&e->pages[i].mutex, NULL);
|
|
Packit |
4e8bc4 |
e->pages[i].id = i;
|
|
Packit |
4e8bc4 |
e->pages[i].fd = f->fd;
|
|
Packit |
4e8bc4 |
e->pages[i].free_bucket = f->free_bucket;
|
|
Packit |
4e8bc4 |
e->pages[i].offset = f->offset;
|
|
Packit |
4e8bc4 |
e->pages[i].free = true;
|
|
Packit |
4e8bc4 |
f->offset += e->page_size;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// free page buckets allows the app to organize devices by use case
|
|
Packit |
4e8bc4 |
e->free_page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
|
|
Packit |
4e8bc4 |
e->page_bucketcount = cf->page_buckets;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
for (i = e->page_count-1; i > 0; i--) {
|
|
Packit |
4e8bc4 |
e->page_free++;
|
|
Packit |
4e8bc4 |
if (e->pages[i].free_bucket == 0) {
|
|
Packit |
4e8bc4 |
e->pages[i].next = e->page_freelist;
|
|
Packit |
4e8bc4 |
e->page_freelist = &e->pages[i];
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
int fb = e->pages[i].free_bucket;
|
|
Packit |
4e8bc4 |
e->pages[i].next = e->free_page_buckets[fb];
|
|
Packit |
4e8bc4 |
e->free_page_buckets[fb] = &e->pages[i];
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// 0 is magic "page is freed" version
|
|
Packit |
4e8bc4 |
e->version = 1;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// scratch data for stats. TODO: malloc failure handle
|
|
Packit |
4e8bc4 |
e->stats.page_data =
|
|
Packit |
4e8bc4 |
calloc(e->page_count, sizeof(struct extstore_page_data));
|
|
Packit |
4e8bc4 |
e->stats.page_count = e->page_count;
|
|
Packit |
4e8bc4 |
e->stats.page_size = e->page_size;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// page buckets lazily have pages assigned into them
|
|
Packit |
4e8bc4 |
e->page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
|
|
Packit |
4e8bc4 |
e->page_bucketcount = cf->page_buckets;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// allocate write buffers
|
|
Packit |
4e8bc4 |
// also IO's to use for shipping to IO thread
|
|
Packit |
4e8bc4 |
for (i = 0; i < cf->wbuf_count; i++) {
|
|
Packit |
4e8bc4 |
_store_wbuf *w = wbuf_new(cf->wbuf_size);
|
|
Packit |
4e8bc4 |
obj_io *io = calloc(1, sizeof(obj_io));
|
|
Packit |
4e8bc4 |
/* TODO: on error, loop again and free stack. */
|
|
Packit |
4e8bc4 |
w->next = e->wbuf_stack;
|
|
Packit |
4e8bc4 |
e->wbuf_stack = w;
|
|
Packit |
4e8bc4 |
io->next = e->io_stack;
|
|
Packit |
4e8bc4 |
e->io_stack = io;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
pthread_mutex_init(&e->mutex, NULL);
|
|
Packit |
4e8bc4 |
pthread_mutex_init(&e->stats_mutex, NULL);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
e->io_depth = cf->io_depth;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// spawn threads
|
|
Packit |
4e8bc4 |
e->io_threads = calloc(cf->io_threadcount, sizeof(store_io_thread));
|
|
Packit |
4e8bc4 |
for (i = 0; i < cf->io_threadcount; i++) {
|
|
Packit |
4e8bc4 |
pthread_mutex_init(&e->io_threads[i].mutex, NULL);
|
|
Packit |
4e8bc4 |
pthread_cond_init(&e->io_threads[i].cond, NULL);
|
|
Packit |
4e8bc4 |
e->io_threads[i].e = e;
|
|
Packit |
4e8bc4 |
// FIXME: error handling
|
|
Packit |
4e8bc4 |
pthread_create(&thread, NULL, extstore_io_thread, &e->io_threads[i]);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
e->io_threadcount = cf->io_threadcount;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
e->maint_thread = calloc(1, sizeof(store_maint_thread));
|
|
Packit |
4e8bc4 |
e->maint_thread->e = e;
|
|
Packit |
4e8bc4 |
// FIXME: error handling
|
|
Packit |
4e8bc4 |
pthread_mutex_init(&e->maint_thread->mutex, NULL);
|
|
Packit |
4e8bc4 |
pthread_cond_init(&e->maint_thread->cond, NULL);
|
|
Packit |
4e8bc4 |
pthread_create(&thread, NULL, extstore_maint_thread, e->maint_thread);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
extstore_run_maint(e);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
return (void *)e;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
void extstore_run_maint(void *ptr) {
|
|
Packit |
4e8bc4 |
store_engine *e = (store_engine *)ptr;
|
|
Packit |
4e8bc4 |
pthread_cond_signal(&e->maint_thread->cond);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// call with *e locked
|
|
Packit |
4e8bc4 |
static store_page *_allocate_page(store_engine *e, unsigned int bucket,
|
|
Packit |
4e8bc4 |
unsigned int free_bucket) {
|
|
Packit |
4e8bc4 |
assert(!e->page_buckets[bucket] || e->page_buckets[bucket]->allocated == e->page_size);
|
|
Packit |
4e8bc4 |
store_page *tmp = NULL;
|
|
Packit |
4e8bc4 |
// if a specific free bucket was requested, check there first
|
|
Packit |
4e8bc4 |
if (free_bucket != 0 && e->free_page_buckets[free_bucket] != NULL) {
|
|
Packit |
4e8bc4 |
assert(e->page_free > 0);
|
|
Packit |
4e8bc4 |
tmp = e->free_page_buckets[free_bucket];
|
|
Packit |
4e8bc4 |
e->free_page_buckets[free_bucket] = tmp->next;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
// failing that, try the global list.
|
|
Packit |
4e8bc4 |
if (tmp == NULL && e->page_freelist != NULL) {
|
|
Packit |
4e8bc4 |
tmp = e->page_freelist;
|
|
Packit |
4e8bc4 |
e->page_freelist = tmp->next;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
E_DEBUG("EXTSTORE: allocating new page\n");
|
|
Packit |
4e8bc4 |
// page_freelist can be empty if the only free pages are specialized and
|
|
Packit |
4e8bc4 |
// we didn't just request one.
|
|
Packit |
4e8bc4 |
if (e->page_free > 0 && tmp != NULL) {
|
|
Packit |
4e8bc4 |
tmp->next = e->page_buckets[bucket];
|
|
Packit |
4e8bc4 |
e->page_buckets[bucket] = tmp;
|
|
Packit |
4e8bc4 |
tmp->active = true;
|
|
Packit |
4e8bc4 |
tmp->free = false;
|
|
Packit |
4e8bc4 |
tmp->closed = false;
|
|
Packit |
4e8bc4 |
tmp->version = _next_version(e);
|
|
Packit |
4e8bc4 |
tmp->bucket = bucket;
|
|
Packit |
4e8bc4 |
e->page_free--;
|
|
Packit |
4e8bc4 |
STAT_INCR(e, page_allocs, 1);
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
extstore_run_maint(e);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
if (tmp)
|
|
Packit |
4e8bc4 |
E_DEBUG("EXTSTORE: got page %u\n", tmp->id);
|
|
Packit |
4e8bc4 |
return tmp;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// call with *p locked. locks *e
|
|
Packit |
4e8bc4 |
static void _allocate_wbuf(store_engine *e, store_page *p) {
|
|
Packit |
4e8bc4 |
_store_wbuf *wbuf = NULL;
|
|
Packit |
4e8bc4 |
assert(!p->wbuf);
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->mutex);
|
|
Packit |
4e8bc4 |
if (e->wbuf_stack) {
|
|
Packit |
4e8bc4 |
wbuf = e->wbuf_stack;
|
|
Packit |
4e8bc4 |
e->wbuf_stack = wbuf->next;
|
|
Packit |
4e8bc4 |
wbuf->next = 0;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->mutex);
|
|
Packit |
4e8bc4 |
if (wbuf) {
|
|
Packit |
4e8bc4 |
wbuf->offset = p->allocated;
|
|
Packit |
4e8bc4 |
p->allocated += wbuf->size;
|
|
Packit |
4e8bc4 |
wbuf->free = wbuf->size;
|
|
Packit |
4e8bc4 |
wbuf->buf_pos = wbuf->buf;
|
|
Packit |
4e8bc4 |
wbuf->full = false;
|
|
Packit |
4e8bc4 |
wbuf->flushed = false;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
p->wbuf = wbuf;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* callback after wbuf is flushed. can only remove wbuf's from the head onward
|
|
Packit |
4e8bc4 |
* if successfully flushed, which complicates this routine. each callback
|
|
Packit |
4e8bc4 |
* attempts to free the wbuf stack, which is finally done when the head wbuf's
|
|
Packit |
4e8bc4 |
* callback happens.
|
|
Packit |
4e8bc4 |
* It's rare flushes would happen out of order.
|
|
Packit |
4e8bc4 |
*/
|
|
Packit |
4e8bc4 |
static void _wbuf_cb(void *ep, obj_io *io, int ret) {
|
|
Packit |
4e8bc4 |
store_engine *e = (store_engine *)ep;
|
|
Packit |
4e8bc4 |
store_page *p = &e->pages[io->page_id];
|
|
Packit |
4e8bc4 |
_store_wbuf *w = (_store_wbuf *) io->data;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// TODO: Examine return code. Not entirely sure how to handle errors.
|
|
Packit |
4e8bc4 |
// Naive first-pass should probably cause the page to close/free.
|
|
Packit |
4e8bc4 |
w->flushed = true;
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&p->mutex);
|
|
Packit |
4e8bc4 |
assert(p->wbuf != NULL && p->wbuf == w);
|
|
Packit |
4e8bc4 |
assert(p->written == w->offset);
|
|
Packit |
4e8bc4 |
p->written += w->size;
|
|
Packit |
4e8bc4 |
p->wbuf = NULL;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
if (p->written == e->page_size)
|
|
Packit |
4e8bc4 |
p->active = false;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// return the wbuf
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->mutex);
|
|
Packit |
4e8bc4 |
w->next = e->wbuf_stack;
|
|
Packit |
4e8bc4 |
e->wbuf_stack = w;
|
|
Packit |
4e8bc4 |
// also return the IO we just used.
|
|
Packit |
4e8bc4 |
io->next = e->io_stack;
|
|
Packit |
4e8bc4 |
e->io_stack = io;
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->mutex);
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* Wraps pages current wbuf in an io and submits to IO thread.
|
|
Packit |
4e8bc4 |
* Called with p locked, locks e.
|
|
Packit |
4e8bc4 |
*/
|
|
Packit |
4e8bc4 |
static void _submit_wbuf(store_engine *e, store_page *p) {
|
|
Packit |
4e8bc4 |
_store_wbuf *w;
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->mutex);
|
|
Packit |
4e8bc4 |
obj_io *io = e->io_stack;
|
|
Packit |
4e8bc4 |
e->io_stack = io->next;
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->mutex);
|
|
Packit |
4e8bc4 |
w = p->wbuf;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// zero out the end of the wbuf to allow blind readback of data.
|
|
Packit |
4e8bc4 |
memset(w->buf + (w->size - w->free), 0, w->free);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
io->next = NULL;
|
|
Packit |
4e8bc4 |
io->mode = OBJ_IO_WRITE;
|
|
Packit |
4e8bc4 |
io->page_id = p->id;
|
|
Packit |
4e8bc4 |
io->data = w;
|
|
Packit |
4e8bc4 |
io->offset = w->offset;
|
|
Packit |
4e8bc4 |
io->len = w->size;
|
|
Packit |
4e8bc4 |
io->buf = w->buf;
|
|
Packit |
4e8bc4 |
io->cb = _wbuf_cb;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
extstore_submit(e, io);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* engine write function; takes engine, item_io.
|
|
Packit |
4e8bc4 |
* fast fail if no available write buffer (flushing)
|
|
Packit |
4e8bc4 |
* lock engine context, find active page, unlock
|
|
Packit |
4e8bc4 |
* if page full, submit page/buffer to io thread.
|
|
Packit |
4e8bc4 |
*
|
|
Packit |
4e8bc4 |
* write is designed to be flaky; if page full, caller must try again to get
|
|
Packit |
4e8bc4 |
* new page. best if used from a background thread that can harmlessly retry.
|
|
Packit |
4e8bc4 |
*/
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
int extstore_write_request(void *ptr, unsigned int bucket,
|
|
Packit |
4e8bc4 |
unsigned int free_bucket, obj_io *io) {
|
|
Packit |
4e8bc4 |
store_engine *e = (store_engine *)ptr;
|
|
Packit |
4e8bc4 |
store_page *p;
|
|
Packit |
4e8bc4 |
int ret = -1;
|
|
Packit |
4e8bc4 |
if (bucket >= e->page_bucketcount)
|
|
Packit |
4e8bc4 |
return ret;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->mutex);
|
|
Packit |
4e8bc4 |
p = e->page_buckets[bucket];
|
|
Packit |
4e8bc4 |
if (!p) {
|
|
Packit |
4e8bc4 |
p = _allocate_page(e, bucket, free_bucket);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->mutex);
|
|
Packit |
4e8bc4 |
if (!p)
|
|
Packit |
4e8bc4 |
return ret;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&p->mutex);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// FIXME: can't null out page_buckets!!!
|
|
Packit |
4e8bc4 |
// page is full, clear bucket and retry later.
|
|
Packit |
4e8bc4 |
if (!p->active ||
|
|
Packit |
4e8bc4 |
((!p->wbuf || p->wbuf->full) && p->allocated >= e->page_size)) {
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->mutex);
|
|
Packit |
4e8bc4 |
_allocate_page(e, bucket, free_bucket);
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->mutex);
|
|
Packit |
4e8bc4 |
return ret;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// if io won't fit, submit IO for wbuf and find new one.
|
|
Packit |
4e8bc4 |
if (p->wbuf && p->wbuf->free < io->len && !p->wbuf->full) {
|
|
Packit |
4e8bc4 |
_submit_wbuf(e, p);
|
|
Packit |
4e8bc4 |
p->wbuf->full = true;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
if (!p->wbuf && p->allocated < e->page_size) {
|
|
Packit |
4e8bc4 |
_allocate_wbuf(e, p);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// hand over buffer for caller to copy into
|
|
Packit |
4e8bc4 |
// leaves p locked.
|
|
Packit |
4e8bc4 |
if (p->wbuf && !p->wbuf->full && p->wbuf->free >= io->len) {
|
|
Packit |
4e8bc4 |
io->buf = p->wbuf->buf_pos;
|
|
Packit |
4e8bc4 |
io->page_id = p->id;
|
|
Packit |
4e8bc4 |
return 0;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
// p->written is incremented post-wbuf flush
|
|
Packit |
4e8bc4 |
return ret;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* _must_ be called after a successful write_request.
|
|
Packit |
4e8bc4 |
* fills the rest of io structure.
|
|
Packit |
4e8bc4 |
*/
|
|
Packit |
4e8bc4 |
void extstore_write(void *ptr, obj_io *io) {
|
|
Packit |
4e8bc4 |
store_engine *e = (store_engine *)ptr;
|
|
Packit |
4e8bc4 |
store_page *p = &e->pages[io->page_id];
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
io->offset = p->wbuf->offset + (p->wbuf->size - p->wbuf->free);
|
|
Packit |
4e8bc4 |
io->page_version = p->version;
|
|
Packit |
4e8bc4 |
p->wbuf->buf_pos += io->len;
|
|
Packit |
4e8bc4 |
p->wbuf->free -= io->len;
|
|
Packit |
4e8bc4 |
p->bytes_used += io->len;
|
|
Packit |
4e8bc4 |
p->obj_count++;
|
|
Packit |
4e8bc4 |
STAT_L(e);
|
|
Packit |
4e8bc4 |
e->stats.bytes_written += io->len;
|
|
Packit |
4e8bc4 |
e->stats.bytes_used += io->len;
|
|
Packit |
4e8bc4 |
e->stats.objects_written++;
|
|
Packit |
4e8bc4 |
e->stats.objects_used++;
|
|
Packit |
4e8bc4 |
STAT_UL(e);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* engine submit function; takes engine, item_io stack.
|
|
Packit |
4e8bc4 |
* lock io_thread context and add stack?
|
|
Packit |
4e8bc4 |
* signal io thread to wake.
|
|
Packit |
4e8bc4 |
* return success.
|
|
Packit |
4e8bc4 |
*/
|
|
Packit |
4e8bc4 |
int extstore_submit(void *ptr, obj_io *io) {
|
|
Packit |
4e8bc4 |
store_engine *e = (store_engine *)ptr;
|
|
Packit |
4e8bc4 |
store_io_thread *t = _get_io_thread(e);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&t->mutex);
|
|
Packit |
4e8bc4 |
if (t->queue == NULL) {
|
|
Packit |
4e8bc4 |
t->queue = io;
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
/* Have to put the *io stack at the end of current queue.
|
|
Packit |
4e8bc4 |
* FIXME: Optimize by tracking tail.
|
|
Packit |
4e8bc4 |
*/
|
|
Packit |
4e8bc4 |
obj_io *tmp = t->queue;
|
|
Packit |
4e8bc4 |
while (tmp->next != NULL) {
|
|
Packit |
4e8bc4 |
tmp = tmp->next;
|
|
Packit |
4e8bc4 |
assert(tmp != t->queue);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
tmp->next = io;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
// TODO: extstore_submit(ptr, io, count)
|
|
Packit |
4e8bc4 |
obj_io *tio = io;
|
|
Packit |
4e8bc4 |
while (tio != NULL) {
|
|
Packit |
4e8bc4 |
t->depth++;
|
|
Packit |
4e8bc4 |
tio = tio->next;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&t->mutex);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
//pthread_mutex_lock(&t->mutex);
|
|
Packit |
4e8bc4 |
pthread_cond_signal(&t->cond);
|
|
Packit |
4e8bc4 |
//pthread_mutex_unlock(&t->mutex);
|
|
Packit |
4e8bc4 |
return 0;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* engine note delete function: takes engine, page id, size?
|
|
Packit |
4e8bc4 |
* note that an item in this page is no longer valid
|
|
Packit |
4e8bc4 |
*/
|
|
Packit |
4e8bc4 |
int extstore_delete(void *ptr, unsigned int page_id, uint64_t page_version,
|
|
Packit |
4e8bc4 |
unsigned int count, unsigned int bytes) {
|
|
Packit |
4e8bc4 |
store_engine *e = (store_engine *)ptr;
|
|
Packit |
4e8bc4 |
// FIXME: validate page_id in bounds
|
|
Packit |
4e8bc4 |
store_page *p = &e->pages[page_id];
|
|
Packit |
4e8bc4 |
int ret = 0;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&p->mutex);
|
|
Packit |
4e8bc4 |
if (!p->closed && p->version == page_version) {
|
|
Packit |
4e8bc4 |
if (p->bytes_used >= bytes) {
|
|
Packit |
4e8bc4 |
p->bytes_used -= bytes;
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
p->bytes_used = 0;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
if (p->obj_count >= count) {
|
|
Packit |
4e8bc4 |
p->obj_count -= count;
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
p->obj_count = 0; // caller has bad accounting?
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
STAT_L(e);
|
|
Packit |
4e8bc4 |
e->stats.bytes_used -= bytes;
|
|
Packit |
4e8bc4 |
e->stats.objects_used -= count;
|
|
Packit |
4e8bc4 |
STAT_UL(e);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
if (p->obj_count == 0) {
|
|
Packit |
4e8bc4 |
extstore_run_maint(e);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
ret = -1;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
return ret;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
int extstore_check(void *ptr, unsigned int page_id, uint64_t page_version) {
|
|
Packit |
4e8bc4 |
store_engine *e = (store_engine *)ptr;
|
|
Packit |
4e8bc4 |
store_page *p = &e->pages[page_id];
|
|
Packit |
4e8bc4 |
int ret = 0;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&p->mutex);
|
|
Packit |
4e8bc4 |
if (p->version != page_version)
|
|
Packit |
4e8bc4 |
ret = -1;
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
return ret;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* allows a compactor to say "we're done with this page, kill it. */
|
|
Packit |
4e8bc4 |
void extstore_close_page(void *ptr, unsigned int page_id, uint64_t page_version) {
|
|
Packit |
4e8bc4 |
store_engine *e = (store_engine *)ptr;
|
|
Packit |
4e8bc4 |
store_page *p = &e->pages[page_id];
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&p->mutex);
|
|
Packit |
4e8bc4 |
if (!p->closed && p->version == page_version) {
|
|
Packit |
4e8bc4 |
p->closed = true;
|
|
Packit |
4e8bc4 |
extstore_run_maint(e);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* Finds an attached wbuf that can satisfy the read.
|
|
Packit |
4e8bc4 |
* Since wbufs can potentially be flushed to disk out of order, they are only
|
|
Packit |
4e8bc4 |
* removed as the head of the list successfully flushes to disk.
|
|
Packit |
4e8bc4 |
*/
|
|
Packit |
4e8bc4 |
// call with *p locked
|
|
Packit |
4e8bc4 |
// FIXME: protect from reading past wbuf
|
|
Packit |
4e8bc4 |
static inline int _read_from_wbuf(store_page *p, obj_io *io) {
|
|
Packit |
4e8bc4 |
_store_wbuf *wbuf = p->wbuf;
|
|
Packit |
4e8bc4 |
assert(wbuf != NULL);
|
|
Packit |
4e8bc4 |
assert(io->offset < p->written + wbuf->size);
|
|
Packit |
4e8bc4 |
if (io->iov == NULL) {
|
|
Packit |
4e8bc4 |
memcpy(io->buf, wbuf->buf + (io->offset - wbuf->offset), io->len);
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
int x;
|
|
Packit |
4e8bc4 |
unsigned int off = io->offset - wbuf->offset;
|
|
Packit |
4e8bc4 |
// need to loop fill iovecs
|
|
Packit |
4e8bc4 |
for (x = 0; x < io->iovcnt; x++) {
|
|
Packit |
4e8bc4 |
struct iovec *iov = &io->iov[x];
|
|
Packit |
4e8bc4 |
memcpy(iov->iov_base, wbuf->buf + off, iov->iov_len);
|
|
Packit |
4e8bc4 |
off += iov->iov_len;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
return io->len;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* engine IO thread; takes engine context
|
|
Packit |
4e8bc4 |
* manage writes/reads
|
|
Packit |
4e8bc4 |
* runs IO callbacks inline after each IO
|
|
Packit |
4e8bc4 |
*/
|
|
Packit |
4e8bc4 |
// FIXME: protect from reading past page
|
|
Packit |
4e8bc4 |
static void *extstore_io_thread(void *arg) {
|
|
Packit |
4e8bc4 |
store_io_thread *me = (store_io_thread *)arg;
|
|
Packit |
4e8bc4 |
store_engine *e = me->e;
|
|
Packit |
4e8bc4 |
while (1) {
|
|
Packit |
4e8bc4 |
obj_io *io_stack = NULL;
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&me->mutex);
|
|
Packit |
4e8bc4 |
if (me->queue == NULL) {
|
|
Packit |
4e8bc4 |
pthread_cond_wait(&me->cond, &me->mutex);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// Pull and disconnect a batch from the queue
|
|
Packit |
4e8bc4 |
if (me->queue != NULL) {
|
|
Packit |
4e8bc4 |
int i;
|
|
Packit |
4e8bc4 |
obj_io *end = NULL;
|
|
Packit |
4e8bc4 |
io_stack = me->queue;
|
|
Packit |
4e8bc4 |
end = io_stack;
|
|
Packit |
4e8bc4 |
for (i = 1; i < e->io_depth; i++) {
|
|
Packit |
4e8bc4 |
if (end->next) {
|
|
Packit |
4e8bc4 |
end = end->next;
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
me->depth -= i;
|
|
Packit |
4e8bc4 |
me->queue = end->next;
|
|
Packit |
4e8bc4 |
end->next = NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&me->mutex);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
obj_io *cur_io = io_stack;
|
|
Packit |
4e8bc4 |
while (cur_io) {
|
|
Packit |
4e8bc4 |
// We need to note next before the callback in case the obj_io
|
|
Packit |
4e8bc4 |
// gets reused.
|
|
Packit |
4e8bc4 |
obj_io *next = cur_io->next;
|
|
Packit |
4e8bc4 |
int ret = 0;
|
|
Packit |
4e8bc4 |
int do_op = 1;
|
|
Packit |
4e8bc4 |
store_page *p = &e->pages[cur_io->page_id];
|
|
Packit |
4e8bc4 |
// TODO: loop if not enough bytes were read/written.
|
|
Packit |
4e8bc4 |
switch (cur_io->mode) {
|
|
Packit |
4e8bc4 |
case OBJ_IO_READ:
|
|
Packit |
4e8bc4 |
// Page is currently open. deal if read is past the end.
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&p->mutex);
|
|
Packit |
4e8bc4 |
if (!p->free && !p->closed && p->version == cur_io->page_version) {
|
|
Packit |
4e8bc4 |
if (p->active && cur_io->offset >= p->written) {
|
|
Packit |
4e8bc4 |
ret = _read_from_wbuf(p, cur_io);
|
|
Packit |
4e8bc4 |
do_op = 0;
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
p->refcount++;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
STAT_L(e);
|
|
Packit |
4e8bc4 |
e->stats.bytes_read += cur_io->len;
|
|
Packit |
4e8bc4 |
e->stats.objects_read++;
|
|
Packit |
4e8bc4 |
STAT_UL(e);
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
do_op = 0;
|
|
Packit |
4e8bc4 |
ret = -2; // TODO: enum in IO for status?
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
if (do_op) {
|
|
Packit |
4e8bc4 |
#ifdef __APPLE__
|
|
Packit |
4e8bc4 |
ret = lseek(p->fd, SEEK_SET, p->offset + cur_io->offset);
|
|
Packit |
4e8bc4 |
if (ret >= 0) {
|
|
Packit |
4e8bc4 |
if (cur_io->iov == NULL) {
|
|
Packit |
4e8bc4 |
ret = read(p->fd, cur_io->buf, cur_io->len);
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
ret = readv(p->fd, cur_io->iov, cur_io->iovcnt);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
#else
|
|
Packit |
4e8bc4 |
if (cur_io->iov == NULL) {
|
|
Packit |
4e8bc4 |
ret = pread(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
ret = preadv(p->fd, cur_io->iov, cur_io->iovcnt, p->offset + cur_io->offset);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
#endif
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
case OBJ_IO_WRITE:
|
|
Packit |
4e8bc4 |
do_op = 0;
|
|
Packit |
4e8bc4 |
// FIXME: Should hold refcount during write. doesn't
|
|
Packit |
4e8bc4 |
// currently matter since page can't free while active.
|
|
Packit |
4e8bc4 |
ret = pwrite(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
if (ret == 0) {
|
|
Packit |
4e8bc4 |
E_DEBUG("read returned nothing\n");
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
#ifdef EXTSTORE_DEBUG
|
|
Packit |
4e8bc4 |
if (ret == -1) {
|
|
Packit |
4e8bc4 |
perror("read/write op failed");
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
#endif
|
|
Packit |
4e8bc4 |
cur_io->cb(e, cur_io, ret);
|
|
Packit |
4e8bc4 |
if (do_op) {
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&p->mutex);
|
|
Packit |
4e8bc4 |
p->refcount--;
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
cur_io = next;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// call with *p locked.
|
|
Packit |
4e8bc4 |
static void _free_page(store_engine *e, store_page *p) {
|
|
Packit |
4e8bc4 |
store_page *tmp = NULL;
|
|
Packit |
4e8bc4 |
store_page *prev = NULL;
|
|
Packit |
4e8bc4 |
E_DEBUG("EXTSTORE: freeing page %u\n", p->id);
|
|
Packit |
4e8bc4 |
STAT_L(e);
|
|
Packit |
4e8bc4 |
e->stats.objects_used -= p->obj_count;
|
|
Packit |
4e8bc4 |
e->stats.bytes_used -= p->bytes_used;
|
|
Packit |
4e8bc4 |
e->stats.page_reclaims++;
|
|
Packit |
4e8bc4 |
STAT_UL(e);
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->mutex);
|
|
Packit |
4e8bc4 |
// unlink page from bucket list
|
|
Packit |
4e8bc4 |
tmp = e->page_buckets[p->bucket];
|
|
Packit |
4e8bc4 |
while (tmp) {
|
|
Packit |
4e8bc4 |
if (tmp == p) {
|
|
Packit |
4e8bc4 |
if (prev) {
|
|
Packit |
4e8bc4 |
prev->next = tmp->next;
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
e->page_buckets[p->bucket] = tmp->next;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
tmp->next = NULL;
|
|
Packit |
4e8bc4 |
break;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
prev = tmp;
|
|
Packit |
4e8bc4 |
tmp = tmp->next;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
// reset most values
|
|
Packit |
4e8bc4 |
p->version = 0;
|
|
Packit |
4e8bc4 |
p->obj_count = 0;
|
|
Packit |
4e8bc4 |
p->bytes_used = 0;
|
|
Packit |
4e8bc4 |
p->allocated = 0;
|
|
Packit |
4e8bc4 |
p->written = 0;
|
|
Packit |
4e8bc4 |
p->bucket = 0;
|
|
Packit |
4e8bc4 |
p->active = false;
|
|
Packit |
4e8bc4 |
p->closed = false;
|
|
Packit |
4e8bc4 |
p->free = true;
|
|
Packit |
4e8bc4 |
// add to page stack
|
|
Packit |
4e8bc4 |
// TODO: free_page_buckets first class and remove redundancy?
|
|
Packit |
4e8bc4 |
if (p->free_bucket != 0) {
|
|
Packit |
4e8bc4 |
p->next = e->free_page_buckets[p->free_bucket];
|
|
Packit |
4e8bc4 |
e->free_page_buckets[p->free_bucket] = p;
|
|
Packit |
4e8bc4 |
} else {
|
|
Packit |
4e8bc4 |
p->next = e->page_freelist;
|
|
Packit |
4e8bc4 |
e->page_freelist = p;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
e->page_free++;
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->mutex);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
/* engine maint thread; takes engine context.
|
|
Packit |
4e8bc4 |
* Uses version to ensure oldest possible objects are being evicted.
|
|
Packit |
4e8bc4 |
* Needs interface to inform owner of pages with fewer objects or most space
|
|
Packit |
4e8bc4 |
* free, which can then be actively compacted to avoid eviction.
|
|
Packit |
4e8bc4 |
*
|
|
Packit |
4e8bc4 |
* This gets called asynchronously after every page allocation. Could run less
|
|
Packit |
4e8bc4 |
* often if more pages are free.
|
|
Packit |
4e8bc4 |
*
|
|
Packit |
4e8bc4 |
* Another allocation call is required if an attempted free didn't happen
|
|
Packit |
4e8bc4 |
* due to the page having a refcount.
|
|
Packit |
4e8bc4 |
*/
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// TODO: Don't over-evict pages if waiting on refcounts to drop
|
|
Packit |
4e8bc4 |
static void *extstore_maint_thread(void *arg) {
|
|
Packit |
4e8bc4 |
store_maint_thread *me = (store_maint_thread *)arg;
|
|
Packit |
4e8bc4 |
store_engine *e = me->e;
|
|
Packit |
4e8bc4 |
struct extstore_page_data *pd =
|
|
Packit |
4e8bc4 |
calloc(e->page_count, sizeof(struct extstore_page_data));
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&me->mutex);
|
|
Packit |
4e8bc4 |
while (1) {
|
|
Packit |
4e8bc4 |
int i;
|
|
Packit |
4e8bc4 |
bool do_evict = false;
|
|
Packit |
4e8bc4 |
unsigned int low_page = 0;
|
|
Packit |
4e8bc4 |
uint64_t low_version = ULLONG_MAX;
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
pthread_cond_wait(&me->cond, &me->mutex);
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&e->mutex);
|
|
Packit |
4e8bc4 |
// default freelist requires at least one page free.
|
|
Packit |
4e8bc4 |
// specialized freelists fall back to default once full.
|
|
Packit |
4e8bc4 |
if (e->page_free == 0 || e->page_freelist == NULL) {
|
|
Packit |
4e8bc4 |
do_evict = true;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&e->mutex);
|
|
Packit |
4e8bc4 |
memset(pd, 0, sizeof(struct extstore_page_data) * e->page_count);
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
for (i = 0; i < e->page_count; i++) {
|
|
Packit |
4e8bc4 |
store_page *p = &e->pages[i];
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&p->mutex);
|
|
Packit |
4e8bc4 |
pd[p->id].free_bucket = p->free_bucket;
|
|
Packit |
4e8bc4 |
if (p->active || p->free) {
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
continue;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
if (p->obj_count > 0 && !p->closed) {
|
|
Packit |
4e8bc4 |
pd[p->id].version = p->version;
|
|
Packit |
4e8bc4 |
pd[p->id].bytes_used = p->bytes_used;
|
|
Packit |
4e8bc4 |
pd[p->id].bucket = p->bucket;
|
|
Packit |
4e8bc4 |
// low_version/low_page are only used in the eviction
|
|
Packit |
4e8bc4 |
// scenario. when we evict, it's only to fill the default page
|
|
Packit |
4e8bc4 |
// bucket again.
|
|
Packit |
4e8bc4 |
// TODO: experiment with allowing evicting up to a single page
|
|
Packit |
4e8bc4 |
// for any specific free bucket. this is *probably* required
|
|
Packit |
4e8bc4 |
// since it could cause a load bias on default-only devices?
|
|
Packit |
4e8bc4 |
if (p->free_bucket == 0 && p->version < low_version) {
|
|
Packit |
4e8bc4 |
low_version = p->version;
|
|
Packit |
4e8bc4 |
low_page = i;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
if ((p->obj_count == 0 || p->closed) && p->refcount == 0) {
|
|
Packit |
4e8bc4 |
_free_page(e, p);
|
|
Packit |
4e8bc4 |
// Found a page to free, no longer need to evict.
|
|
Packit |
4e8bc4 |
do_evict = false;
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
if (do_evict && low_version != ULLONG_MAX) {
|
|
Packit |
4e8bc4 |
store_page *p = &e->pages[low_page];
|
|
Packit |
4e8bc4 |
E_DEBUG("EXTSTORE: evicting page [%d] [v: %llu]\n",
|
|
Packit |
4e8bc4 |
p->id, (unsigned long long) p->version);
|
|
Packit |
4e8bc4 |
pthread_mutex_lock(&p->mutex);
|
|
Packit |
4e8bc4 |
if (!p->closed) {
|
|
Packit |
4e8bc4 |
p->closed = true;
|
|
Packit |
4e8bc4 |
STAT_L(e);
|
|
Packit |
4e8bc4 |
e->stats.page_evictions++;
|
|
Packit |
4e8bc4 |
e->stats.objects_evicted += p->obj_count;
|
|
Packit |
4e8bc4 |
e->stats.bytes_evicted += p->bytes_used;
|
|
Packit |
4e8bc4 |
STAT_UL(e);
|
|
Packit |
4e8bc4 |
if (p->refcount == 0) {
|
|
Packit |
4e8bc4 |
_free_page(e, p);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
pthread_mutex_unlock(&p->mutex);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
// copy the page data into engine context so callers can use it from
|
|
Packit |
4e8bc4 |
// the stats lock.
|
|
Packit |
4e8bc4 |
STAT_L(e);
|
|
Packit |
4e8bc4 |
memcpy(e->stats.page_data, pd,
|
|
Packit |
4e8bc4 |
sizeof(struct extstore_page_data) * e->page_count);
|
|
Packit |
4e8bc4 |
STAT_UL(e);
|
|
Packit |
4e8bc4 |
}
|
|
Packit |
4e8bc4 |
|
|
Packit |
4e8bc4 |
return NULL;
|
|
Packit |
4e8bc4 |
}
|