Blob Blame History Raw
/*
 * Copyright 2016, FUJITSU TECHNOLOGY SOLUTIONS GMBH
 * Copyright 2012, Armon Dadgar. All rights reserved.
 * Copyright 2017, Intel Corporation
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *     * Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *
 *     * Redistributions in binary form must reproduce the above copyright
 *       notice, this list of conditions and the following disclaimer in
 *       the documentation and/or other materials provided with the
 *       distribution.
 *
 *     * Neither the name of the copyright holder nor the names of its
 *       contributors may be used to endorse or promote products derived
 *       from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

/*
 * ===========================================================================
 *
 *       Filename:  art.c
 *
 *    Description:  implement ART tree using libpmemobj based on libart
 *
 *         Author:  Andreas Bluemle, Dieter Kasper
 *                  Andreas.Bluemle.external@ts.fujitsu.com
 *                  dieter.kasper@ts.fujitsu.com
 *
 *   Organization:  FUJITSU TECHNOLOGY SOLUTIONS GMBH
 * ============================================================================
 */

/*
 * based on https://github.com/armon/libart/src/art.c
 */

#include <assert.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <strings.h>
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <stdbool.h>
#include <fcntl.h>
#include <emmintrin.h>
#include <sys/types.h>
#include "libpmemobj.h"
#include "art.h"

TOID(var_string) null_var_string;
TOID(art_leaf)   null_art_leaf;
TOID(art_node_u) null_art_node_u;

int art_tree_init(PMEMobjpool *pop, int *newpool);
TOID(art_node_u) make_leaf(PMEMobjpool *pop, const unsigned char *key,
			    int key_len, void *value, int val_len);
int fill_leaf(PMEMobjpool *pop, TOID(art_leaf) al, const unsigned char *key,
			    int key_len, void *value, int val_len);
TOID(art_node_u) alloc_node(PMEMobjpool *pop, art_node_type node_type);

TOID(var_string) art_insert(PMEMobjpool *pop, const unsigned char *key,
			    int key_len, void *value, int val_len);
TOID(var_string) art_delete(PMEMobjpool *pop, const unsigned char *key,
			    int key_len);
static TOID(var_string) recursive_insert(PMEMobjpool *pop,
			    TOID(art_node_u) n, TOID(art_node_u) *ref,
			    const unsigned char *key, int key_len,
			    void *value, int val_len, int depth, int *old_val);
static TOID(art_leaf) recursive_delete(PMEMobjpool *pop,
			    TOID(art_node_u) n, TOID(art_node_u) *ref,
			    const unsigned char *key, int key_len, int depth);
static int leaf_matches(TOID(art_leaf) n, const unsigned char *key,
			    int key_len, int depth);
static int longest_common_prefix(TOID(art_leaf) l1, TOID(art_leaf) l2,
			    int depth);
static int prefix_mismatch(TOID(art_node_u) n, unsigned char *key,
			    int key_len, int depth);
#ifdef LIBART_ITER_PREFIX
static int leaf_prefix_matches(TOID(art_leaf) n,
			    const unsigned char *prefix, int prefix_len);
#endif

static TOID(art_leaf) minimum(TOID(art_node_u) n_u);
static TOID(art_leaf) maximum(TOID(art_node_u) n_u);
static void copy_header(art_node *dest, art_node *src);

static void add_child(PMEMobjpool *pop, TOID(art_node_u) n,
			    TOID(art_node_u) *ref, unsigned char c,
			    TOID(art_node_u) child);
static void add_child4(PMEMobjpool *pop, TOID(art_node4) n,
			    TOID(art_node_u) *ref, unsigned char c,
			    TOID(art_node_u) child);
static void add_child16(PMEMobjpool *pop, TOID(art_node16) n,
			    TOID(art_node_u) *ref, unsigned char c,
			    TOID(art_node_u) child);
static void add_child48(PMEMobjpool *pop, TOID(art_node48) n,
			    TOID(art_node_u) *ref, unsigned char c,
			    TOID(art_node_u) child);
static void add_child256(PMEMobjpool *pop, TOID(art_node256) n,
			    TOID(art_node_u) *ref, unsigned char c,
			    TOID(art_node_u) child);
static void remove_child(PMEMobjpool *pop, TOID(art_node_u) n,
			    TOID(art_node_u) *ref, unsigned char c,
			    TOID(art_node_u) *l);
static void remove_child4(PMEMobjpool *pop, TOID(art_node4) n,
			    TOID(art_node_u) *ref, TOID(art_node_u) *l);
static void remove_child16(PMEMobjpool *pop, TOID(art_node16) n,
			    TOID(art_node_u) *ref, TOID(art_node_u) *l);
static void remove_child48(PMEMobjpool *pop, TOID(art_node48) n,
			    TOID(art_node_u) *ref, unsigned char c);
static void remove_child256(PMEMobjpool *pop, TOID(art_node256) n,
			    TOID(art_node_u) *ref, unsigned char c);

static TOID(art_node_u)* find_child(TOID(art_node_u) n, unsigned char c);
static int check_prefix(const art_node *n, const unsigned char *key,
			    int key_len, int depth);
static int leaf_matches(TOID(art_leaf) n, const unsigned char *key,
			    int key_len, int depth);

TOID(art_leaf) art_minimum(TOID(struct art_tree_root) t);
TOID(art_leaf) art_maximum(TOID(struct art_tree_root) t);

#if 0
static void destroy_node(TOID(art_node_u) n_u);
#endif
int art_iter(PMEMobjpool *pop, art_callback cb, void *data);

static void PMEMOIDcopy(PMEMoid *dest, const PMEMoid *src, const int n);
static void PMEMOIDmove(PMEMoid *dest, PMEMoid *src, const int n);

static void
PMEMOIDcopy(PMEMoid *dest, const PMEMoid *src, const int n)
{
	int i;

	for (i = 0; i < n; i++) {
		dest[i] = src[i];
	}
}

static void
PMEMOIDmove(PMEMoid *dest, PMEMoid *src, const int n)
{
	int i;

	if (dest > src) {
		for (i = n - 1; i >= 0; i--) {
			dest[i] = src[i];
		}
	} else {
		for (i = 0; i < n; i++) {
			dest[i] = src[i];
		}
	}
}

TOID(art_node_u)
alloc_node(PMEMobjpool *pop, art_node_type node_type)
{
	TOID(art_node_u)  node;
	TOID(art_node4)   an4;
	TOID(art_node16)  an16;
	TOID(art_node48)  an48;
	TOID(art_node256) an256;
	TOID(art_leaf)    al;

	node = TX_ZNEW(art_node_u);
	D_RW(node)->art_node_type = (uint8_t)node_type;
	switch (node_type) {
	case NODE4:
		an4 = TX_ZNEW(art_node4);
		D_RW(node)->u.an4 = an4;
		break;
	case NODE16:
		an16 = TX_ZNEW(art_node16);
		D_RW(node)->u.an16 = an16;
		break;
	case NODE48:
		an48 = TX_ZNEW(art_node48);
		D_RW(node)->u.an48 = an48;
		break;
	case NODE256:
		an256 = TX_ZNEW(art_node256);
		D_RW(node)->u.an256 = an256;
		break;
	case art_leaf_t:
		al = TX_ZNEW(art_leaf);
		D_RW(node)->u.al = al;
		break;
	default:
		/* invalid node type */
		D_RW(node)->art_node_type = (uint8_t)art_node_types;
		break;
	}

	return node;
}

int
art_tree_init(PMEMobjpool *pop, int *newpool)
{
	int errors = 0;
	TOID(struct art_tree_root) root;

	if (pop == NULL) {
		errors++;
	}

	null_var_string.oid = OID_NULL;
	null_art_leaf.oid = OID_NULL;
	null_art_node_u.oid = OID_NULL;

	if (!errors) {
		TX_BEGIN(pop) {
			root = POBJ_ROOT(pop, struct art_tree_root);
			if (*newpool) {
				TX_ADD(root);
				D_RW(root)->root.oid = OID_NULL;
				D_RW(root)->size = 0;
				*newpool = 0;
			}
		} TX_END
	}

	return errors;
}

#if 0
// Recursively destroys the tree
static void
destroy_node(TOID(art_node_u) n_u)
{
	// Break if null
	if (TOID_IS_NULL(n_u))
		return;

	// Special case leafs
	if (IS_LEAF(D_RO(n_u))) {
		TX_FREE(n_u);
		return;
	}

	// Handle each node type
	int i;
	TOID(art_node4)   an4;
	TOID(art_node16)  an16;
	TOID(art_node48)  an48;
	TOID(art_node256) an256;

	switch (D_RO(n_u)->art_node_type) {
	case NODE4:
		an4 = D_RO(n_u)->u.an4;
		for (i = 0; i < D_RO(an4)->n.num_children; i++) {
			destroy_node(D_RW(an4)->children[i]);
		}
		break;

	case NODE16:
		an16 = D_RO(n_u)->u.an16;
		for (i = 0; i < D_RO(an16)->n.num_children; i++) {
			destroy_node(D_RW(an16)->children[i]);
		}
		break;

	case NODE48:
		an48 = D_RO(n_u)->u.an48;
		for (i = 0; i < D_RO(an48)->n.num_children; i++) {
			destroy_node(D_RW(an48)->children[i]);
		}
		break;

	case NODE256:
		an256 = D_RO(n_u)->u.an256;
		for (i = 0; i < D_RO(an256)->n.num_children; i++) {
			if (!(TOID_IS_NULL(D_RO(an256)->children[i]))) {
				destroy_node(D_RW(an256)->children[i]);
			}
		}
		break;

	default:
		abort();
	}

	// Free ourself on the way up
	TX_FREE(n_u);
}

/*
 * Destroys an ART tree
 * @return 0 on success.
 */
static int
art_tree_destroy(TOID(struct art_tree_root) t)
{
	destroy_node(D_RO(t)->root);
	return 0;
}
#endif

static TOID(art_node_u)*
find_child(TOID(art_node_u) n, unsigned char c)
{
	int i;
	int mask;
	int bitfield;
	TOID(art_node4)   an4;
	TOID(art_node16)  an16;
	TOID(art_node48)  an48;
	TOID(art_node256) an256;

	switch (D_RO(n)->art_node_type) {
	case NODE4:
		an4 = D_RO(n)->u.an4;
		for (i = 0; i < D_RO(an4)->n.num_children; i++) {
			if (D_RO(an4)->keys[i] == c) {
				return &(D_RW(an4)->children[i]);
			}
		}
		break;

	case NODE16: {
		__m128i cmp;
		an16 = D_RO(n)->u.an16;

		// Compare the key to all 16 stored keys
		cmp = _mm_cmpeq_epi8(_mm_set1_epi8(c),
			    _mm_loadu_si128((__m128i *)D_RO(an16)->keys));

		// Use a mask to ignore children that don't exist
		mask = (1 << D_RO(an16)->n.num_children) - 1;
		bitfield = _mm_movemask_epi8(cmp) & mask;

		/*
		 * If we have a match (any bit set) then we can
		 * return the pointer match using ctz to get the index.
		 */
		if (bitfield) {
			return &(D_RW(an16)->children[__builtin_ctz(bitfield)]);
		}
		break;
	}

	case NODE48:
		an48 = D_RO(n)->u.an48;
		i = D_RO(an48)->keys[c];
		if (i) {
			return &(D_RW(an48)->children[i - 1]);
		}
		break;

	case NODE256:
		an256 = D_RO(n)->u.an256;
		if (!TOID_IS_NULL(D_RO(an256)->children[c])) {
			return &(D_RW(an256)->children[c]);
		}
		break;

	default:
		abort();
	}
	return &null_art_node_u;
}

static inline int
min(int a, int b)
{
	return (a < b) ? a : b;
}

/*
 * Returns the number of prefix characters shared between
 * the key and node.
 */
static int
check_prefix(const art_node *n,
	const unsigned char *key, int key_len, int depth)
{
	int max_cmp = min(min(n->partial_len, MAX_PREFIX_LEN), key_len - depth);
	int idx;
	for (idx = 0; idx < max_cmp; idx++) {
		if (n->partial[idx] != key[depth + idx])
			return idx;
	}
	return idx;
}

/*
 * Checks if a leaf matches
 * @return 0 on success.
 */
static int
leaf_matches(TOID(art_leaf) n, const unsigned char *key, int key_len, int depth)
{
	(void) depth;
	// Fail if the key lengths are different
	if (D_RO(D_RO(n)->key)->len != (uint32_t)key_len)
		return 1;

	// Compare the keys starting at the depth
	return memcmp(D_RO(D_RO(n)->key)->s, key, key_len);
}

/*
 * Searches for a value in the ART tree
 * @arg t The tree
 * @arg key The key
 * @arg key_len The length of the key
 * @return NULL if the item was not found, otherwise
 * the value pointer is returned.
 */
TOID(var_string)
art_search(PMEMobjpool *pop, const unsigned char *key, int key_len)
{
	TOID(struct art_tree_root)t = POBJ_ROOT(pop, struct art_tree_root);
	TOID(art_node_u) *child;
	TOID(art_node_u) n = D_RO(t)->root;
	const art_node *n_an;
	int prefix_len;
	int depth = 0;

	while (!TOID_IS_NULL(n)) {
		// Might be a leaf
		if (IS_LEAF(D_RO(n))) {
			// n = LEAF_RAW(n);
			// Check if the expanded path matches
			if (!leaf_matches(D_RO(n)->u.al, key, key_len, depth)) {
				return (D_RO(D_RO(n)->u.al))->value;
			}
			return null_var_string;
		}

		switch (D_RO(n)->art_node_type) {
		case NODE4:    n_an = &(D_RO(D_RO(n)->u.an4)->n);   break;
		case NODE16:   n_an = &(D_RO(D_RO(n)->u.an16)->n);  break;
		case NODE48:   n_an = &(D_RO(D_RO(n)->u.an48)->n);  break;
		case NODE256:  n_an = &(D_RO(D_RO(n)->u.an256)->n); break;
		default:
			return null_var_string;
		}

		// Bail if the prefix does not match
		if (n_an->partial_len) {
			prefix_len = check_prefix(n_an, key, key_len, depth);
			if (prefix_len !=
				    min(MAX_PREFIX_LEN, n_an->partial_len))
				return null_var_string;
			depth = depth + n_an->partial_len;
		}

		// Recursively search
		child = find_child(n, key[depth]);
		if (TOID_IS_NULL(*child)) {
			n.oid = OID_NULL;
		} else {
			n = *child;
		}
		depth++;
	}
	return null_var_string;
}

// Find the minimum leaf under a node
static TOID(art_leaf)
minimum(TOID(art_node_u) n_u)
{
	TOID(art_node4)   an4;
	TOID(art_node16)  an16;
	TOID(art_node48)  an48;
	TOID(art_node256) an256;

	// Handle base cases
	if (TOID_IS_NULL(n_u))
		return null_art_leaf;
	if (IS_LEAF(D_RO(n_u)))
		return D_RO(n_u)->u.al;

	int idx;
	switch (D_RO(n_u)->art_node_type) {
	case NODE4:
		an4 = D_RO(n_u)->u.an4;
		return minimum(D_RO(an4)->children[0]);
	case NODE16:
		an16 = D_RO(n_u)->u.an16;
		return minimum(D_RO(an16)->children[0]);
	case NODE48:
		an48 = D_RO(n_u)->u.an48;
		idx = 0;
		while (!(D_RO(an48)->keys[idx]))
			idx++;
		idx = D_RO(an48)->keys[idx] - 1;
		assert(idx < 48);
		return minimum(D_RO(an48)->children[idx]);
	case NODE256:
		an256 = D_RO(n_u)->u.an256;
		idx = 0;
		while (!(TOID_IS_NULL(D_RO(an256)->children[idx])))
			idx++;
		return minimum(D_RO(an256)->children[idx]);
	default:
		abort();
	}
}

// Find the maximum leaf under a node
static TOID(art_leaf)
maximum(TOID(art_node_u) n_u)
{
	TOID(art_node4)   an4;
	TOID(art_node16)  an16;
	TOID(art_node48)  an48;
	TOID(art_node256) an256;
	const art_node *n_an;

	// Handle base cases
	if (TOID_IS_NULL(n_u))
		return null_art_leaf;
	if (IS_LEAF(D_RO(n_u)))
		return D_RO(n_u)->u.al;

	int idx;
	switch (D_RO(n_u)->art_node_type) {
	case NODE4:
		an4 = D_RO(n_u)->u.an4;
		n_an = &(D_RO(an4)->n);
		return maximum(D_RO(an4)->children[n_an->num_children - 1]);
	case NODE16:
		an16 = D_RO(n_u)->u.an16;
		n_an = &(D_RO(an16)->n);
		return maximum(D_RO(an16)->children[n_an->num_children - 1]);
	case NODE48:
		an48 = D_RO(n_u)->u.an48;
		idx = 255;
		while (!(D_RO(an48)->keys[idx]))
			idx--;
		idx = D_RO(an48)->keys[idx] - 1;
		assert((idx >= 0) && (idx < 48));
		return maximum(D_RO(an48)->children[idx]);
	case NODE256:
		an256 = D_RO(n_u)->u.an256;
		idx = 255;
		while (!(TOID_IS_NULL(D_RO(an256)->children[idx])))
			idx--;
		return maximum(D_RO(an256)->children[idx]);
	default:
		abort();
	}
}

/*
 * Returns the minimum valued leaf
 */
TOID(art_leaf)
art_minimum(TOID(struct art_tree_root) t)
{
	return minimum(D_RO(t)->root);
}

/*
 * Returns the maximum valued leaf
 */
TOID(art_leaf)
art_maximum(TOID(struct art_tree_root) t)
{
	return maximum(D_RO(t)->root);
}

TOID(art_node_u)
make_leaf(PMEMobjpool *pop,
	const unsigned char *key, int key_len, void *value, int val_len)
{
	TOID(art_node_u)newleaf;

	newleaf = alloc_node(pop, art_leaf_t);
	fill_leaf(pop, D_RW(newleaf)->u.al, key, key_len, value, val_len);

	return newleaf;
}

static int
longest_common_prefix(TOID(art_leaf) l1, TOID(art_leaf) l2, int depth)
{
	TOID(var_string) l1_key = D_RO(l1)->key;
	TOID(var_string) l2_key = D_RO(l2)->key;
	int max_cmp;
	int idx;

	max_cmp = min(D_RO(l1_key)->len, D_RO(l2_key)->len) - depth;
	for (idx = 0; idx < max_cmp; idx++) {
		if (D_RO(l1_key)->s[depth + idx] !=
		    D_RO(l2_key)->s[depth + idx])
			return idx;
	}
	return idx;
}

static void
copy_header(art_node *dest, art_node *src)
{
	dest->num_children = src->num_children;
	dest->partial_len = src->partial_len;
	memcpy(dest->partial, src->partial,
	    min(MAX_PREFIX_LEN, src->partial_len));
}

static void
add_child256(PMEMobjpool *pop, TOID(art_node256) n, TOID(art_node_u) *ref,
	unsigned char c, TOID(art_node_u) child)
{
	art_node *n_an;

	(void) ref;

	TX_ADD(n);

	n_an = &(D_RW(n)->n);
	n_an->num_children++;
	D_RW(n)->children[c] = child;
}

static void
add_child48(PMEMobjpool *pop, TOID(art_node48) n, TOID(art_node_u) *ref,
	unsigned char c, TOID(art_node_u) child)
{
	art_node *n_an;

	n_an = &(D_RW(n)->n);
	if (n_an->num_children < 48) {
		int pos = 0;
		TX_ADD(n);
		while (!(TOID_IS_NULL(D_RO(n)->children[pos])))
			pos++;
		D_RW(n)->children[pos] = child;
		D_RW(n)->keys[c] = pos + 1;
		n_an->num_children++;
	} else {
		TOID(art_node_u)  newnode_u = alloc_node(pop, NODE256);
		TOID(art_node256) newnode = D_RO(newnode_u)->u.an256;

		pmemobj_tx_add_range_direct(ref, sizeof(TOID(art_node_u)));

		for (int i = 0; i < 256; i++) {
			if (D_RO(n)->keys[i]) {
				D_RW(newnode)->children[i] =
					D_RO(n)->children[D_RO(n)->keys[i] - 1];
			}
		}
		copy_header(&(D_RW(newnode)->n), n_an);
		*ref = newnode_u;
		TX_FREE(n);
		add_child256(pop, newnode, ref, c, child);
	}
}

static void
add_child16(PMEMobjpool *pop, TOID(art_node16) n, TOID(art_node_u)*ref,
	unsigned char c, TOID(art_node_u) child)
{
	art_node *n_an;

	n_an = &(D_RW(n)->n);
	if (n_an->num_children < 16) {
		__m128i cmp;

		TX_ADD(n);

		// Compare the key to all 16 stored keys
		cmp = _mm_cmplt_epi8(_mm_set1_epi8(c),
			    _mm_loadu_si128((__m128i *)(D_RO(n)->keys)));

		// Use a mask to ignore children that don't exist
		unsigned mask = (1 << n_an->num_children) - 1;
		unsigned bitfield = _mm_movemask_epi8(cmp) & mask;

		// Check if less than any
		unsigned idx;
		if (bitfield) {
			idx = __builtin_ctz(bitfield);
			memmove(&(D_RW(n)->keys[idx + 1]),
			    &(D_RO(n)->keys[idx]),
			    n_an->num_children - idx);
			PMEMOIDmove(&(D_RW(n)->children[idx + 1].oid),
			    &(D_RW(n)->children[idx].oid),
			    n_an->num_children - idx);
		} else {
			idx = n_an->num_children;
		}

		// Set the child
		D_RW(n)->keys[idx] = c;
		D_RW(n)->children[idx] = child;
		n_an->num_children++;

	} else {
		TOID(art_node_u) newnode_u = alloc_node(pop, NODE48);
		TOID(art_node48) newnode = D_RO(newnode_u)->u.an48;

		// Copy the child pointers and populate the key map
		PMEMOIDcopy(&(D_RW(newnode)->children[0].oid),
		    &(D_RO(n)->children[0].oid),
		    n_an->num_children);
		for (int i = 0; i < n_an->num_children; i++) {
			D_RW(newnode)->keys[D_RO(n)->keys[i]] = i + 1;
		}
		copy_header(&(D_RW(newnode))->n, n_an);
		*ref = newnode_u;
		TX_FREE(n);
		add_child48(pop, newnode, ref, c, child);
	}
}

static void
add_child4(PMEMobjpool *pop, TOID(art_node4) n, TOID(art_node_u) *ref,
	unsigned char c, TOID(art_node_u) child)
{
	art_node *n_an;

	n_an = &(D_RW(n)->n);
	if (n_an->num_children < 4) {
		int idx;
		TX_ADD(n);
		for (idx = 0; idx < n_an->num_children; idx++) {
			if (c < D_RO(n)->keys[idx]) break;
		}

		// Shift to make room
		memmove(D_RW(n)->keys + idx + 1, D_RO(n)->keys + idx,
		    n_an->num_children - idx);
		assert((idx + 1) < 4);
		PMEMOIDmove(&(D_RW(n)->children[idx + 1].oid),
		    &(D_RW(n)->children[idx].oid),
		    n_an->num_children - idx);

		// Insert element
		D_RW(n)->keys[idx] = c;
		D_RW(n)->children[idx] = child;
		n_an->num_children++;
	} else {
		TOID(art_node_u) newnode_u = alloc_node(pop, NODE16);
		TOID(art_node16) newnode = D_RO(newnode_u)->u.an16;

		pmemobj_tx_add_range_direct(ref, sizeof(TOID(art_node_u)));

		// Copy the child pointers and the key map
		PMEMOIDcopy(&(D_RW(newnode)->children[0].oid),
		    &(D_RO(n)->children[0].oid), n_an->num_children);
		memcpy(D_RW(newnode)->keys, D_RO(n)->keys, n_an->num_children);
		copy_header(&(D_RW(newnode)->n), n_an);
		*ref = newnode_u;
		TX_FREE(n);
		add_child16(pop, newnode, ref, c, child);
	}
}

static void
add_child(PMEMobjpool *pop, TOID(art_node_u) n, TOID(art_node_u) *ref,
	unsigned char c, TOID(art_node_u) child)
{
	switch (D_RO(n)->art_node_type) {
	case NODE4:
		add_child4(pop, D_RO(n)->u.an4, ref, c, child);
		break;
	case NODE16:
		add_child16(pop, D_RO(n)->u.an16, ref, c, child);
		break;
	case NODE48:
		add_child48(pop, D_RO(n)->u.an48, ref, c, child);
		break;
	case NODE256:
		add_child256(pop, D_RO(n)->u.an256, ref, c, child);
		break;
	default:
		abort();
	}
}

static int
prefix_mismatch(TOID(art_node_u) n, unsigned char *key, int key_len, int depth)
{
	const art_node *n_an;
	int max_cmp;
	int idx;

	switch (D_RO(n)->art_node_type) {
	case NODE4:    n_an = &(D_RO(D_RO(n)->u.an4)->n);   break;
	case NODE16:   n_an = &(D_RO(D_RO(n)->u.an16)->n);  break;
	case NODE48:   n_an = &(D_RO(D_RO(n)->u.an48)->n);  break;
	case NODE256:  n_an = &(D_RO(D_RO(n)->u.an256)->n); break;
	default: return 0;
	}
	max_cmp = min(min(MAX_PREFIX_LEN, n_an->partial_len), key_len - depth);
	for (idx = 0; idx < max_cmp; idx++) {
		if (n_an->partial[idx] != key[depth + idx])
			return idx;
	}

	// If the prefix is short we can avoid finding a leaf
	if (n_an->partial_len > MAX_PREFIX_LEN) {
		// Prefix is longer than what we've checked, find a leaf
		TOID(art_leaf) l = minimum(n);
		max_cmp = min(D_RO(D_RO(l)->key)->len, key_len) - depth;
		for (; idx < max_cmp; idx++) {
			if (D_RO(D_RO(l)->key)->s[idx + depth] !=
			    key[depth + idx])
				return idx;
		}
	}
	return idx;
}

static TOID(var_string)
recursive_insert(PMEMobjpool *pop, TOID(art_node_u) n, TOID(art_node_u) *ref,
	const unsigned char *key, int key_len,
	void *value, int val_len, int depth, int *old)
{
	art_node *n_an;
	TOID(var_string) retval;

	// If we are at a NULL node, inject a leaf
	if (TOID_IS_NULL(n)) {
		*ref = make_leaf(pop, key, key_len, value, val_len);
		TX_ADD(*ref);
		SET_LEAF(D_RW(*ref));
		retval = null_var_string;
		return retval;
	}

	// If we are at a leaf, we need to replace it with a node
	if (IS_LEAF(D_RO(n))) {
		TOID(art_leaf)l = D_RO(n)->u.al;

		// Check if we are updating an existing value
		if (!leaf_matches(l, key, key_len, depth)) {
			*old = 1;
			retval = D_RO(l)->value;
			TX_ADD(D_RW(l)->value);
			COPY_BLOB(D_RW(l)->value, value, val_len);
			return retval;
		}

		// New value, we must split the leaf into a node4
		pmemobj_tx_add_range_direct(ref,
		    sizeof(TOID(art_node_u)));
		TOID(art_node_u) newnode_u = alloc_node(pop, NODE4);
		TOID(art_node4)  newnode = D_RO(newnode_u)->u.an4;
		art_node *newnode_n = &(D_RW(newnode)->n);

		// Create a new leaf

		TOID(art_node_u) l2_u =
		    make_leaf(pop, key, key_len, value, val_len);
		TOID(art_leaf) l2 = D_RO(l2_u)->u.al;

		// Determine longest prefix
		int longest_prefix =
		    longest_common_prefix(l, l2, depth);
		newnode_n->partial_len = longest_prefix;
		memcpy(newnode_n->partial, key + depth,
		    min(MAX_PREFIX_LEN, longest_prefix));
		// Add the leafs to the newnode node4
		*ref = newnode_u;
		add_child4(pop, newnode, ref,
		    D_RO(D_RO(l)->key)->s[depth + longest_prefix],
		    n);
		add_child4(pop, newnode, ref,
		    D_RO(D_RO(l2)->key)->s[depth + longest_prefix],
		    l2_u);
		return null_var_string;
	}

	// Check if given node has a prefix
	switch (D_RO(n)->art_node_type) {
	case NODE4:   n_an = &(D_RW(D_RW(n)->u.an4)->n); break;
	case NODE16:  n_an = &(D_RW(D_RW(n)->u.an16)->n); break;
	case NODE48:  n_an = &(D_RW(D_RW(n)->u.an48)->n); break;
	case NODE256: n_an = &(D_RW(D_RW(n)->u.an256)->n); break;
	default: abort();
	}
	if (n_an->partial_len) {
		// Determine if the prefixes differ, since we need to split
		int prefix_diff =
		    prefix_mismatch(n, (unsigned char *)key, key_len, depth);
		if ((uint32_t)prefix_diff >= n_an->partial_len) {
			depth += n_an->partial_len;
			goto RECURSE_SEARCH;
		}

		// Create a new node
		pmemobj_tx_add_range_direct(ref,
		    sizeof(TOID(art_node_u)));
		pmemobj_tx_add_range_direct(n_an, sizeof(art_node));
		TOID(art_node_u) newnode_u = alloc_node(pop, NODE4);
		TOID(art_node4)  newnode = D_RO(newnode_u)->u.an4;
		art_node *newnode_n = &(D_RW(newnode)->n);

		*ref = newnode_u;
		newnode_n->partial_len = prefix_diff;
		memcpy(newnode_n->partial, n_an->partial,
		    min(MAX_PREFIX_LEN, prefix_diff));

		// Adjust the prefix of the old node
		if (n_an->partial_len <= MAX_PREFIX_LEN) {
			add_child4(pop, newnode, ref,
			    n_an->partial[prefix_diff], n);
			n_an->partial_len -= (prefix_diff + 1);
			memmove(n_an->partial,
			    n_an->partial + prefix_diff + 1,
			    min(MAX_PREFIX_LEN, n_an->partial_len));
		} else {
			unsigned char *dst;
			const unsigned char *src;
			size_t len;

			n_an->partial_len -= (prefix_diff + 1);
			TOID(art_leaf) l = minimum(n);
			add_child4(pop, newnode, ref,
			    D_RO(D_RO(l)->key)->s[depth + prefix_diff],
			    n);
			dst = n_an->partial;
			src =
		    &(D_RO(D_RO(l)->key)->s[depth + prefix_diff + 1 ]);
			len = min(MAX_PREFIX_LEN, n_an->partial_len);

			memcpy(dst, src, len);
		}

		// Insert the new leaf
		TOID(art_node_u) l =
		    make_leaf(pop, key, key_len, value, val_len);
		SET_LEAF(D_RW(l));
		add_child4(pop, newnode, ref, key[depth + prefix_diff], l);
		return null_var_string;
	}

RECURSE_SEARCH:;

	// Find a child to recurse to
	TOID(art_node_u) *child = find_child(n, key[depth]);
	if (!TOID_IS_NULL(*child)) {
		return recursive_insert(pop, *child, child,
			    key, key_len, value, val_len, depth + 1, old);
	}

	// No child, node goes within us
	TOID(art_node_u) l =
	    make_leaf(pop, key, key_len, value, val_len);
	SET_LEAF(D_RW(l));
	add_child(pop, n, ref, key[depth], l);
	retval = null_var_string;

	return retval;
}

/*
 * Returns the size of the ART tree
 */
uint64_t
art_size(PMEMobjpool *pop)
{
	TOID(struct art_tree_root) root;
	root = POBJ_ROOT(pop, struct art_tree_root);
	return D_RO(root)->size;
}

/*
 * Inserts a new value into the ART tree
 * @arg t The tree
 * @arg key The key
 * @arg key_len The length of the key
 * @arg value Opaque value.
 * @return NULL if the item was newly inserted, otherwise
 * the old value pointer is returned.
 */
TOID(var_string)
art_insert(PMEMobjpool *pop,
	const unsigned char *key, int key_len, void *value, int val_len)
{
	int old_val = 0;
	TOID(var_string) old;
	TOID(struct art_tree_root) root;

	TX_BEGIN(pop) {
		root = POBJ_ROOT(pop, struct art_tree_root);
		TX_ADD(root);

		old = recursive_insert(pop, D_RO(root)->root,
			    &(D_RW(root)->root),
			    (const unsigned char *)key, key_len,
			    value, val_len, 0, &old_val);
		if (!old_val)
			D_RW(root)->size++;
	} TX_ONABORT {
		abort();
	} TX_END

	return old;
}

static void
remove_child256(PMEMobjpool *pop,
	TOID(art_node256) n, TOID(art_node_u) *ref, unsigned char c)
{
	art_node *n_an = &(D_RW(n)->n);

	TX_ADD(n);

	D_RW(n)->children[c].oid = OID_NULL;
	n_an->num_children--;

	// Resize to a node48 on underflow, not immediately to prevent
	// trashing if we sit on the 48/49 boundary
	if (n_an->num_children == 37) {
		TOID(art_node_u) newnode_u = alloc_node(pop, NODE48);
		TOID(art_node48) newnode_an48 = D_RO(newnode_u)->u.an48;

		pmemobj_tx_add_range_direct(ref, sizeof(TOID(art_node_u)));

		*ref = newnode_u;
		copy_header(&(D_RW(newnode_an48)->n), n_an);

		int pos = 0;
		for (int i = 0; i < 256; i++) {
			if (!TOID_IS_NULL(D_RO(n)->children[i])) {
				assert(pos < 48);
				D_RW(newnode_an48)->children[pos] =
				    D_RO(n)->children[i];
				D_RW(newnode_an48)->keys[i] = pos + 1;
				pos++;
			}
		}
		TX_FREE(n);
	}
}

static void
remove_child48(PMEMobjpool *pop,
	TOID(art_node48) n, TOID(art_node_u) *ref, unsigned char c)
{
	int pos = D_RO(n)->keys[c];
	art_node *n_an = &(D_RW(n)->n);

	TX_ADD(n);

	D_RW(n)->keys[c] = 0;
	D_RW(n)->children[pos - 1].oid = OID_NULL;
	n_an->num_children--;

	if (n_an->num_children == 12) {
		TOID(art_node_u) newnode_u = alloc_node(pop, NODE16);
		TOID(art_node16) newnode_an16 = D_RO(newnode_u)->u.an16;

		pmemobj_tx_add_range_direct(ref, sizeof(TOID(art_node_u)));

		*ref = newnode_u;
		copy_header(&(D_RW(newnode_an16)->n), n_an);

		int child = 0;
		for (int i = 0; i < 256; i++) {
			pos = D_RO(n)->keys[i];
			if (pos) {
				assert(child < 16);
				D_RW(newnode_an16)->keys[child] = i;
				D_RW(newnode_an16)->children[child] =
				    D_RO(n)->children[pos - 1];
				child++;
			}
		}
		TX_FREE(n);
	}
}

static void
remove_child16(PMEMobjpool *pop,
	TOID(art_node16) n, TOID(art_node_u) *ref, TOID(art_node_u) *l)
{
	int pos = l - &(D_RO(n)->children[0]);
	uint8_t num_children = ((D_RW(n)->n).num_children);

	TX_ADD(n);

	memmove(D_RW(n)->keys + pos, D_RO(n)->keys + pos + 1,
	    num_children - 1 - pos);
	memmove(D_RW(n)->children + pos,
	    D_RO(n)->children + pos + 1,
	    (num_children - 1 - pos) * sizeof(void *));
	((D_RW(n)->n).num_children)--;

	if (--num_children == 3) {
		TOID(art_node_u) newnode_u	 = alloc_node(pop, NODE4);
		TOID(art_node4)  newnode_an4 = D_RO(newnode_u)->u.an4;

		pmemobj_tx_add_range_direct(ref, sizeof(TOID(art_node_u)));

		*ref = newnode_u;
		copy_header(&(D_RW(newnode_an4)->n), &(D_RW(n)->n));
		memcpy(D_RW(newnode_an4)->keys, D_RO(n)->keys, 4);
		memcpy(D_RW(newnode_an4)->children,
		    D_RO(n)->children, 4 * sizeof(TOID(art_node_u)));
		TX_FREE(n);
	}
}

static void
remove_child4(PMEMobjpool *pop,
	TOID(art_node4) n, TOID(art_node_u) *ref, TOID(art_node_u) *l)
{
	int pos = l - &(D_RO(n)->children[0]);
	uint8_t *num_children = &((D_RW(n)->n).num_children);

	TX_ADD(n);

	memmove(D_RW(n)->keys + pos, D_RO(n)->keys + pos + 1,
	    *num_children - 1 - pos);
	memmove(D_RW(n)->children + pos, D_RO(n)->children + pos + 1,
	    (*num_children - 1 - pos) * sizeof(void *));
	(*num_children)--;

	// Remove nodes with only a single child
	if (*num_children == 1) {
		TOID(art_node_u) child_u = D_RO(n)->children[0];
		art_node *child = &(D_RW(D_RW(child_u)->u.an4)->n);

		pmemobj_tx_add_range_direct(ref, sizeof(TOID(art_node_u)));

		if (!IS_LEAF(D_RO(child_u))) {
			// Concatenate the prefixes
			int prefix = (D_RW(n)->n).partial_len;
			if (prefix < MAX_PREFIX_LEN) {
				(D_RW(n)->n).partial[prefix] =
				    D_RO(n)->keys[0];
				prefix++;
			}
			if (prefix < MAX_PREFIX_LEN) {
				int sub_prefix = min(child->partial_len,
				    MAX_PREFIX_LEN - prefix);
				memcpy((D_RW(n)->n).partial + prefix,
				    child->partial, sub_prefix);
				prefix += sub_prefix;
			}

			// Store the prefix in the child
			memcpy(child->partial,
			    (D_RO(n)->n).partial, min(prefix, MAX_PREFIX_LEN));
			child->partial_len += (D_RO(n)->n).partial_len + 1;
		}
		*ref = child_u;
		TX_FREE(n);
	}
}

static void
remove_child(PMEMobjpool *pop,
	TOID(art_node_u) n, TOID(art_node_u) *ref,
	unsigned char c, TOID(art_node_u) *l)
{
	switch (D_RO(n)->art_node_type) {
	case NODE4:
		return remove_child4(pop,   D_RO(n)->u.an4,   ref, l);
	case NODE16:
		return remove_child16(pop,  D_RO(n)->u.an16,  ref, l);
	case NODE48:
		return remove_child48(pop,  D_RO(n)->u.an48,  ref, c);
	case NODE256:
		return remove_child256(pop, D_RO(n)->u.an256, ref, c);
	default:
		abort();
	}
}

static TOID(art_leaf)
recursive_delete(PMEMobjpool *pop,
	TOID(art_node_u) n, TOID(art_node_u) *ref,
	const unsigned char *key, int key_len, int depth)
{
	const art_node *n_an;

	// Search terminated
	if (TOID_IS_NULL(n))
		return null_art_leaf;

	// Handle hitting a leaf node
	if (IS_LEAF(D_RO(n))) {
		TOID(art_leaf) l = D_RO(n)->u.al;
		if (!leaf_matches(l, key, key_len, depth)) {
			*ref = null_art_node_u;
			return l;
		}
		return null_art_leaf;
	}

	// get art_node component
	switch (D_RO(n)->art_node_type) {
	case NODE4:   n_an = &(D_RO(D_RO(n)->u.an4)->n); break;
	case NODE16:  n_an = &(D_RO(D_RO(n)->u.an16)->n); break;
	case NODE48:  n_an = &(D_RO(D_RO(n)->u.an48)->n); break;
	case NODE256: n_an = &(D_RO(D_RO(n)->u.an256)->n); break;
	default: abort();
	}

	// Bail if the prefix does not match
	if (n_an->partial_len) {
		int prefix_len = check_prefix(n_an, key, key_len, depth);
		if (prefix_len != min(MAX_PREFIX_LEN, n_an->partial_len)) {
			return null_art_leaf;
		}
		depth = depth + n_an->partial_len;
	}

	// Find child node
	TOID(art_node_u) *child = find_child(n, key[depth]);
	if (TOID_IS_NULL(*child))
		return null_art_leaf;

	// If the child is leaf, delete from this node
	if (IS_LEAF(D_RO(*child))) {
		TOID(art_leaf)l = D_RO(*child)->u.al;
		if (!leaf_matches(l, key, key_len, depth)) {
			remove_child(pop, n, ref, key[depth], child);
			return l;
		}
		return null_art_leaf;
	} else {
		// Recurse
		return recursive_delete(pop, *child, child,
			    (const unsigned char *)key, key_len, depth + 1);
	}
}

/*
 * Deletes a value from the ART tree
 * @arg t The tree
 * @arg key The key
 * @arg key_len The length of the key
 * @return NULL if the item was not found, otherwise
 * the value pointer is returned.
 */
TOID(var_string)
art_delete(PMEMobjpool *pop,
	const unsigned char *key, int key_len)
{
	TOID(struct art_tree_root)root = POBJ_ROOT(pop, struct art_tree_root);
	TOID(art_leaf) l;
	TOID(var_string) retval;

	retval = null_var_string;

	TX_BEGIN(pop) {
		TX_ADD(root);
		l = recursive_delete(pop, D_RO(root)->root,
		    &D_RW(root)->root, key, key_len, 0);
		if (!TOID_IS_NULL(l)) {
			D_RW(root)->size--;
			TOID(var_string)old = D_RO(l)->value;
			TX_FREE(l);
			retval = old;
		}
	} TX_ONABORT {
		abort();
	} TX_END

	return retval;
}


// Recursively iterates over the tree
static int
recursive_iter(TOID(art_node_u)n, art_callback cb, void *data)
{
	const art_node    *n_an;
	TOID(art_node4)   an4;
	TOID(art_node16)  an16;
	TOID(art_node48)  an48;
	TOID(art_node256) an256;
	TOID(art_leaf)    l;
	TOID(var_string)  key;
	TOID(var_string)  value;
	cb_data cbd;

	// Handle base cases
	if (TOID_IS_NULL(n)) {
		return 0;
	}

	cbd.node = n;
	cbd.child_idx = -1;
	if (IS_LEAF(D_RO(n))) {
		l = D_RO(n)->u.al;
		key = D_RO(l)->key;
		value = D_RO(l)->value;
		return cb(&cbd, D_RO(key)->s, D_RO(key)->len,
			    D_RO(value)->s, D_RO(value)->len);
	}

	int idx, res;
	switch (D_RO(n)->art_node_type) {
	case NODE4:
		an4 = D_RO(n)->u.an4;
		n_an = &(D_RO(an4)->n);
		for (int i = 0; i < n_an->num_children; i++) {
			cbd.child_idx = i;
			cb(&cbd, NULL, 0, NULL, 0);
			res = recursive_iter(D_RO(an4)->children[i], cb, data);
			if (res)
				return res;
		}
		break;

	case NODE16:
		an16 = D_RO(n)->u.an16;
		n_an = &(D_RO(an16)->n);
		for (int i = 0; i < n_an->num_children; i++) {
			cbd.child_idx = i;
			cb(&cbd, NULL, 0, NULL, 0);
			res = recursive_iter(D_RO(an16)->children[i], cb, data);
			if (res)
				return res;
		}
		break;

	case NODE48:
		an48 = D_RO(n)->u.an48;
		for (int i = 0; i < 256; i++) {
			idx = D_RO(an48)->keys[i];
			if (!idx)
				continue;

			cbd.child_idx = idx - 1;
			cb(&cbd, NULL, 0, NULL, 0);
			res = recursive_iter(D_RO(an48)->children[idx - 1],
				    cb, data);
			if (res)
				return res;
		}
		break;

	case NODE256:
		an256 = D_RO(n)->u.an256;
		for (int i = 0; i < 256; i++) {
			if (TOID_IS_NULL(D_RO(an256)->children[i]))
				continue;
			cbd.child_idx = i;
			cb(&cbd, NULL, 0, NULL, 0);
			res = recursive_iter(D_RO(an256)->children[i],
				    cb, data);
			if (res)
				return res;
		}
		break;

	default:
		abort();
	}
	return 0;
}

/*
 * Iterates through the entries pairs in the map,
 * invoking a callback for each. The call back gets a
 * key, value for each and returns an integer stop value.
 * If the callback returns non-zero, then the iteration stops.
 * @arg t The tree to iterate over
 * @arg cb The callback function to invoke
 * @arg data Opaque handle passed to the callback
 * @return 0 on success, or the return of the callback.
 */
int
art_iter(PMEMobjpool *pop, art_callback cb, void *data)
{
	TOID(struct art_tree_root) t = POBJ_ROOT(pop, struct art_tree_root);
	return recursive_iter(D_RO(t)->root, cb, data);
}

#ifdef LIBART_ITER_PREFIX /* {  */
/*
 * Checks if a leaf prefix matches
 * @return 0 on success.
 */
static int
leaf_prefix_matches(TOID(art_leaf) n,
	const unsigned char *prefix, int prefix_len)
{
	// Fail if the key length is too short
	if (D_RO(D_RO(n)->key)->len < (uint32_t)prefix_len)
		return 1;

	// Compare the keys
	return memcmp(D_RO(D_RO(n)->key)->s, prefix, prefix_len);
}

/*
 * Iterates through the entries pairs in the map,
 * invoking a callback for each that matches a given prefix.
 * The call back gets a key, value for each and returns an integer stop value.
 * If the callback returns non-zero, then the iteration stops.
 * @arg t The tree to iterate over
 * @arg prefix The prefix of keys to read
 * @arg prefix_len The length of the prefix
 * @arg cb The callback function to invoke
 * @arg data Opaque handle passed to the callback
 * @return 0 on success, or the return of the callback.
 */
int
art_iter_prefix(art_tree *t,
	const unsigned char *key, int key_len, art_callback cb, void *data)
{
	art_node **child;
	art_node *n = t->root;
	int prefix_len, depth = 0;
	while (n) {
		// Might be a leaf
		if (IS_LEAF(n)) {
			n = LEAF_RAW(n);
			// Check if the expanded path matches
			if (!leaf_prefix_matches((art_leaf *)n, key, key_len)) {
				art_leaf *l = (art_leaf *)n;
				return cb(data,
					    (const unsigned char *)l->key,
					    l->key_len, l->value);
			}
			return 0;
		}

		// If the depth matches the prefix, we need to handle this node
		if (depth == key_len) {
			art_leaf *l = minimum(n);
			if (!leaf_prefix_matches(l, key, key_len))
				return recursive_iter(n, cb, data);
			return 0;
		}

		// Bail if the prefix does not match
		if (n->partial_len) {
			prefix_len = prefix_mismatch(n, key, key_len, depth);

			// If there is no match, search is terminated
			if (!prefix_len)
				return 0;

			// If we've matched the prefix, iterate on this node
			else if (depth + prefix_len == key_len) {
				return recursive_iter(n, cb, data);
			}

			// if there is a full match, go deeper
			depth = depth + n->partial_len;
		}

		// Recursively search
		child = find_child(n, key[depth]);
		n = (child) ? *child : NULL;
		depth++;
	}
	return 0;
}
#endif /* } LIBART_ITER_PREFIX */

int
fill_leaf(PMEMobjpool *pop, TOID(art_leaf) al,
	const unsigned char *key, int key_len, void *value, int val_len)
{
	int retval = 0;
	size_t l_key;
	size_t l_val;

	TOID(var_string) Tkey;
	TOID(var_string) Tval;

	l_key = (sizeof(var_string) + key_len);
	l_val = (sizeof(var_string) + val_len);
	Tkey = TX_ALLOC(var_string, l_key);
	Tval = TX_ALLOC(var_string, l_val);

	COPY_BLOB(Tkey, key,   key_len);
	COPY_BLOB(Tval, value, val_len);

	D_RW(al)->key = Tkey;
	D_RW(al)->value = Tval;

	return retval;
}