Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2017 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */


#include "rd.h"
#include "rdbuf.h"
#include "rdunittest.h"
#include "rdlog.h"
#include "rdcrc32.h"
#include "crc32c.h"


static size_t
rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p);


/**
 * @brief Destroy the segment and free its payload.
 *
 * @remark Will NOT unlink from buffer.
 */
static void rd_segment_destroy (rd_segment_t *seg) {
        /* Free payload */
        if (seg->seg_free && seg->seg_p)
                seg->seg_free(seg->seg_p);

        if (seg->seg_flags & RD_SEGMENT_F_FREE)
                rd_free(seg);
}

/**
 * @brief Initialize segment with absolute offset, backing memory pointer,
 *        and backing memory size.
 * @remark The segment is NOT linked.
 */
static void rd_segment_init (rd_segment_t *seg, void *mem, size_t size) {
        memset(seg, 0, sizeof(*seg));
        seg->seg_p     = mem;
        seg->seg_size  = size;
}


/**
 * @brief Append segment to buffer
 *
 * @remark Will set the buffer position to the new \p seg if no existing wpos.
 * @remark Will set the segment seg_absof to the current length of the buffer.
 */
static rd_segment_t *rd_buf_append_segment (rd_buf_t *rbuf, rd_segment_t *seg) {
        TAILQ_INSERT_TAIL(&rbuf->rbuf_segments, seg, seg_link);
        rbuf->rbuf_segment_cnt++;
        seg->seg_absof      = rbuf->rbuf_len;
        rbuf->rbuf_len     += seg->seg_of;
        rbuf->rbuf_size    += seg->seg_size;

        /* Update writable position */
        if (!rbuf->rbuf_wpos)
                rbuf->rbuf_wpos = seg;
        else
                rd_buf_get_writable0(rbuf, NULL, NULL);

        return seg;
}




/**
 * @brief Attempt to allocate \p size bytes from the buffers extra buffers.
 * @returns the allocated pointer which MUST NOT be freed, or NULL if
 *          not enough memory.
 * @remark the returned pointer is memory-aligned to be safe.
 */
static void *extra_alloc (rd_buf_t *rbuf, size_t size) {
        size_t of = RD_ROUNDUP(rbuf->rbuf_extra_len, 8); /* FIXME: 32-bit */
        void *p;

        if (of + size > rbuf->rbuf_extra_size)
                return NULL;

        p = rbuf->rbuf_extra + of; /* Aligned pointer */

        rbuf->rbuf_extra_len = of + size;

        return p;
}



/**
 * @brief Get a pre-allocated segment if available, or allocate a new
 *        segment with the extra amount of \p size bytes allocated for payload.
 *
 *        Will not append the segment to the buffer.
 */
static rd_segment_t *
rd_buf_alloc_segment0 (rd_buf_t *rbuf, size_t size) {
        rd_segment_t *seg;

        /* See if there is enough room in the extra buffer for
         * allocating the segment header and the buffer,
         * or just the segment header, else fall back to malloc. */
        if ((seg = extra_alloc(rbuf, sizeof(*seg) + size))) {
                rd_segment_init(seg, size > 0 ? seg+1 : NULL, size);

        } else if ((seg = extra_alloc(rbuf, sizeof(*seg)))) {
                rd_segment_init(seg, size > 0 ? rd_malloc(size) : NULL, size);
                if (size > 0)
                        seg->seg_free = rd_free;

        } else if ((seg = rd_malloc(sizeof(*seg) + size))) {
                rd_segment_init(seg, size > 0 ? seg+1 : NULL, size);
                seg->seg_flags |= RD_SEGMENT_F_FREE;

        } else
                rd_assert(!*"segment allocation failure");

        return seg;
}

/**
 * @brief Allocate between \p min_size .. \p max_size of backing memory
 *        and add it as a new segment to the buffer.
 *
 *        The buffer position is updated to point to the new segment.
 *
 *        The segment will be over-allocated if permitted by max_size
 *        (max_size == 0 or max_size > min_size).
 */
static rd_segment_t *
rd_buf_alloc_segment (rd_buf_t *rbuf, size_t min_size, size_t max_size) {
        rd_segment_t *seg;

        /* Over-allocate if allowed. */
        if (min_size != max_size || max_size == 0)
                max_size = RD_MAX(sizeof(*seg) * 4,
                                  RD_MAX(min_size * 2,
                                         rbuf->rbuf_size / 2));

        seg = rd_buf_alloc_segment0(rbuf, max_size);

        rd_buf_append_segment(rbuf, seg);

        return seg;
}


/**
 * @brief Ensures that \p size bytes will be available
 *        for writing and the position will be updated to point to the
 *        start of this contiguous block.
 */
void rd_buf_write_ensure_contig (rd_buf_t *rbuf, size_t size) {
        rd_segment_t *seg = rbuf->rbuf_wpos;

        if (seg) {
                void *p;
                size_t remains = rd_segment_write_remains(seg, &p);

                if (remains >= size)
                        return; /* Existing segment has enough space. */

                /* Future optimization:
                 * If existing segment has enough remaining space to warrant
                 * a split, do it, before allocating a new one. */
        }

        /* Allocate new segment */
        rbuf->rbuf_wpos = rd_buf_alloc_segment(rbuf, size, size);
}

/**
 * @brief Ensures that at least \p size bytes will be available for
 *        a future write.
 *
 *        Typically used prior to a call to rd_buf_get_write_iov()
 */
void rd_buf_write_ensure (rd_buf_t *rbuf, size_t min_size, size_t max_size) {
        size_t remains;
        while ((remains = rd_buf_write_remains(rbuf)) < min_size)
                rd_buf_alloc_segment(rbuf,
                                     min_size - remains,
                                     max_size ? max_size - remains : 0);
}


/**
 * @returns the segment at absolute offset \p absof, or NULL if out of range.
 *
 * @remark \p hint is an optional segment where to start looking, such as
 *         the current write or read position.
 */
rd_segment_t *
rd_buf_get_segment_at_offset (const rd_buf_t *rbuf, const rd_segment_t *hint,
                              size_t absof) {
        const rd_segment_t *seg = hint;

        if (unlikely(absof > rbuf->rbuf_len))
                return NULL;

        /* Only use current write position if possible and if it helps */
        if (!seg || absof < seg->seg_absof)
                seg = TAILQ_FIRST(&rbuf->rbuf_segments);

        do {
                if (absof >= seg->seg_absof &&
                    absof < seg->seg_absof + seg->seg_of) {
                        rd_dassert(seg->seg_absof <= rd_buf_len(rbuf));
                        return (rd_segment_t *)seg;
                }
        } while ((seg = TAILQ_NEXT(seg, seg_link)));

        return NULL;
}


/**
 * @brief Split segment \p seg at absolute offset \p absof, appending
 *        a new segment after \p seg with its memory pointing to the
 *        memory starting at \p absof.
 *        \p seg 's memory will be shorted to the \p absof.
 *
 *        The new segment is NOT appended to the buffer.
 *
 * @warning MUST ONLY be used on the LAST segment
 *
 * @warning if a segment is inserted between these two splitted parts
 *          it is imperative that the later segment's absof is corrected.
 *
 * @remark The seg_free callback is retained on the original \p seg
 *         and is not copied to the new segment, but flags are copied.
 */
static rd_segment_t *rd_segment_split (rd_buf_t *rbuf, rd_segment_t *seg,
                                       size_t absof) {
        rd_segment_t *newseg;
        size_t relof;

        rd_assert(seg == rbuf->rbuf_wpos);
        rd_assert(absof >= seg->seg_absof &&
                  absof <= seg->seg_absof + seg->seg_of);

        relof = absof - seg->seg_absof;

        newseg = rd_buf_alloc_segment0(rbuf, 0);

        /* Add later part of split bytes to new segment */
        newseg->seg_p      = seg->seg_p+relof;
        newseg->seg_of     = seg->seg_of-relof;
        newseg->seg_size   = seg->seg_size-relof;
        newseg->seg_absof  = SIZE_MAX; /* Invalid */
        newseg->seg_flags |= seg->seg_flags;

        /* Remove earlier part of split bytes from previous segment */
        seg->seg_of        = relof;
        seg->seg_size      = relof;

        /* newseg's length will be added to rbuf_len in append_segment(),
         * so shave it off here from seg's perspective. */
        rbuf->rbuf_len   -= newseg->seg_of;
        rbuf->rbuf_size  -= newseg->seg_size;

        return newseg;
}




/**
 * @brief Unlink and destroy a segment, updating the \p rbuf
 *        with the decrease in length and capacity.
 */
static void rd_buf_destroy_segment (rd_buf_t *rbuf, rd_segment_t *seg) {
        rd_assert(rbuf->rbuf_segment_cnt > 0 &&
                  rbuf->rbuf_len >= seg->seg_of &&
                  rbuf->rbuf_size >= seg->seg_size);

        TAILQ_REMOVE(&rbuf->rbuf_segments, seg, seg_link);
        rbuf->rbuf_segment_cnt--;
        rbuf->rbuf_len  -= seg->seg_of;
        rbuf->rbuf_size -= seg->seg_size;
        rd_dassert(rbuf->rbuf_len <= seg->seg_absof);
        if (rbuf->rbuf_wpos == seg)
                rbuf->rbuf_wpos = NULL;

        rd_segment_destroy(seg);
}


/**
 * @brief Free memory associated with the \p rbuf, but not the rbuf itself.
 *        Segments will be destroyed.
 */
void rd_buf_destroy (rd_buf_t *rbuf) {
        rd_segment_t *seg, *tmp;

#if ENABLE_DEVEL
        /* FIXME */
        if (rbuf->rbuf_len > 0 && 0) {
                size_t overalloc = rbuf->rbuf_size - rbuf->rbuf_len;
                float fill_grade = (float)rbuf->rbuf_len /
                        (float)rbuf->rbuf_size;

                printf("fill grade: %.2f%% (%zu bytes over-allocated)\n",
                       fill_grade * 100.0f, overalloc);
        }
#endif


        TAILQ_FOREACH_SAFE(seg, &rbuf->rbuf_segments, seg_link, tmp) {
                rd_segment_destroy(seg);

        }

        if (rbuf->rbuf_extra)
                rd_free(rbuf->rbuf_extra);
}


/**
 * @brief Initialize buffer, pre-allocating \p fixed_seg_cnt segments
 *        where the first segment will have a \p buf_size of backing memory.
 *
 *        The caller may rearrange the backing memory as it see fits.
 */
void rd_buf_init (rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size) {
        size_t totalloc = 0;

        memset(rbuf, 0, sizeof(*rbuf));
        TAILQ_INIT(&rbuf->rbuf_segments);

        if (!fixed_seg_cnt) {
                assert(!buf_size);
                return;
        }

        /* Pre-allocate memory for a fixed set of segments that are known
         * before-hand, to minimize the number of extra allocations
         * needed for well-known layouts (such as headers, etc) */
        totalloc += RD_ROUNDUP(sizeof(rd_segment_t), 8) * fixed_seg_cnt;

        /* Pre-allocate extra space for the backing buffer. */
        totalloc += buf_size;

        rbuf->rbuf_extra_size = totalloc;
        rbuf->rbuf_extra = rd_malloc(rbuf->rbuf_extra_size);
}




/**
 * @brief Convenience writer iterator interface.
 *
 *        After writing to \p p the caller must update the written length
 *        by calling rd_buf_write(rbuf, NULL, written_length)
 *
 * @returns the number of contiguous writable bytes in segment
 *          and sets \p *p to point to the start of the memory region.
 */
static size_t
rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p) {
        rd_segment_t *seg;

        for (seg = rbuf->rbuf_wpos ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
                size_t len = rd_segment_write_remains(seg, p);

                /* Even though the write offset hasn't changed we
                 * avoid future segment scans by adjusting the
                 * wpos here to the first writable segment. */
                rbuf->rbuf_wpos = seg;
                if (segp)
                        *segp = seg;

                if (unlikely(len == 0))
                        continue;

                /* Also adjust absof if the segment was allocated
                 * before the previous segment's memory was exhausted
                 * and thus now might have a lower absolute offset
                 * than the previos segment's now higher relative offset. */
                if (seg->seg_of == 0 && seg->seg_absof < rbuf->rbuf_len)
                        seg->seg_absof = rbuf->rbuf_len;

                return len;
        }

        return 0;
}

size_t rd_buf_get_writable (rd_buf_t *rbuf, void **p) {
        rd_segment_t *seg;
        return rd_buf_get_writable0(rbuf, &seg, p);
}




/**
 * @brief Write \p payload of \p size bytes to current position
 *        in buffer. A new segment will be allocated and appended
 *        if needed.
 *
 * @returns the write position where payload was written (pre-write).
 *          Returning the pre-positition allows write_update() to later
 *          update the same location, effectively making write()s
 *          also a place-holder mechanism.
 *
 * @remark If \p payload is NULL only the write position is updated,
 *         in this mode it is required for the buffer to have enough
 *         memory for the NULL write (as it would otherwise cause
 *         uninitialized memory in any new segments allocated from this
 *         function).
 */
size_t rd_buf_write (rd_buf_t *rbuf, const void *payload, size_t size) {
        size_t remains = size;
        size_t initial_absof;
        const char *psrc = (const char *)payload;

        initial_absof = rbuf->rbuf_len;

        /* Ensure enough space by pre-allocating segments. */
        rd_buf_write_ensure(rbuf, size, 0);

        while (remains > 0) {
                void *p;
                rd_segment_t *seg;
                size_t segremains = rd_buf_get_writable0(rbuf, &seg, &p);
                size_t wlen = RD_MIN(remains, segremains);

                rd_dassert(seg == rbuf->rbuf_wpos);
                rd_dassert(wlen > 0);
                rd_dassert(seg->seg_p+seg->seg_of <= (char *)p &&
                           (char *)p < seg->seg_p+seg->seg_size);

                if (payload) {
                        memcpy(p, psrc, wlen);
                        psrc += wlen;
                }

                seg->seg_of    += wlen;
                rbuf->rbuf_len += wlen;
                remains        -= wlen;
        }

        rd_assert(remains == 0);

        return initial_absof;
}



/**
 * @brief Write \p slice to \p rbuf
 *
 * @remark The slice position will be updated.
 *
 * @returns the number of bytes witten (always slice length)
 */
size_t rd_buf_write_slice (rd_buf_t *rbuf, rd_slice_t *slice) {
        const void *p;
        size_t rlen;
        size_t sum = 0;

        while ((rlen = rd_slice_reader(slice, &p))) {
                size_t r;
                r = rd_buf_write(rbuf, p, rlen);
                rd_dassert(r != 0);
                sum += r;
        }

        return sum;
}



/**
 * @brief Write \p payload of \p size at absolute offset \p absof
 *        WITHOUT updating the total buffer length.
 *
 *        This is used to update a previously written region, such
 *        as updating the header length.
 *
 * @returns the number of bytes written, which may be less than \p size
 *          if the update spans multiple segments.
 */
static size_t rd_segment_write_update (rd_segment_t *seg, size_t absof,
                                       const void *payload, size_t size) {
        size_t relof;
        size_t wlen;

        rd_dassert(absof >= seg->seg_absof);
        relof = absof - seg->seg_absof;
        rd_assert(relof <= seg->seg_of);
        wlen = RD_MIN(size, seg->seg_of - relof);
        rd_dassert(relof + wlen <= seg->seg_of);

        memcpy(seg->seg_p+relof, payload, wlen);

        return wlen;
}



/**
 * @brief Write \p payload of \p size at absolute offset \p absof
 *        WITHOUT updating the total buffer length.
 *
 *        This is used to update a previously written region, such
 *        as updating the header length.
 */
size_t rd_buf_write_update (rd_buf_t *rbuf, size_t absof,
                            const void *payload, size_t size) {
        rd_segment_t *seg;
        const char *psrc = (const char *)payload;
        size_t of;

        /* Find segment for offset */
        seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof);
        rd_assert(seg && *"invalid absolute offset");

        for (of = 0 ; of < size ; seg = TAILQ_NEXT(seg, seg_link)) {
                rd_assert(seg->seg_absof <= rd_buf_len(rbuf));
                size_t wlen = rd_segment_write_update(seg, absof+of,
                                                      psrc+of, size-of);
                of += wlen;
        }

        rd_dassert(of == size);

        return of;
}



/**
 * @brief Push reference memory segment to current write position.
 */
void rd_buf_push (rd_buf_t *rbuf, const void *payload, size_t size,
                  void (*free_cb)(void *)) {
        rd_segment_t *prevseg, *seg, *tailseg = NULL;

        if ((prevseg = rbuf->rbuf_wpos) &&
            rd_segment_write_remains(prevseg, NULL) > 0) {
                /* If the current segment still has room in it split it
                 * and insert the pushed segment in the middle (below). */
                tailseg = rd_segment_split(rbuf, prevseg,
                                           prevseg->seg_absof +
                                           prevseg->seg_of);
        }

        seg = rd_buf_alloc_segment0(rbuf, 0);
        seg->seg_p      = (char *)payload;
        seg->seg_size   = size;
        seg->seg_of     = size;
        seg->seg_free   = free_cb;
        seg->seg_flags |= RD_SEGMENT_F_RDONLY;

        rd_buf_append_segment(rbuf, seg);

        if (tailseg)
                rd_buf_append_segment(rbuf, tailseg);
}







/**
 * @brief Do a write-seek, updating the write position to the given
 *        absolute \p absof.
 *
 * @warning Any sub-sequent segments will be destroyed.
 *
 * @returns -1 if the offset is out of bounds, else 0.
 */
int rd_buf_write_seek (rd_buf_t *rbuf, size_t absof) {
        rd_segment_t *seg, *next;
        size_t relof;

        seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof);
        if (unlikely(!seg))
                return -1;

        relof = absof - seg->seg_absof;
        if (unlikely(relof > seg->seg_of))
                return -1;

        /* Destroy sub-sequent segments in reverse order so that
         * destroy_segment() length checks are correct.
         * Will decrement rbuf_len et.al. */
        for (next = TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head) ;
             next != seg ; ) {
                rd_segment_t *this = next;
                next = TAILQ_PREV(this, rd_segment_head, seg_link);
                rd_buf_destroy_segment(rbuf, this);
        }

        /* Update relative write offset */
        seg->seg_of         = relof;
        rbuf->rbuf_wpos     = seg;
        rbuf->rbuf_len      = seg->seg_absof + seg->seg_of;

        rd_assert(rbuf->rbuf_len == absof);

        return 0;
}


/**
 * @brief Set up the iovecs in \p iovs (of size \p iov_max) with the writable
 *        segments from the buffer's current write position.
 *
 * @param iovcntp will be set to the number of populated \p iovs[]
 * @param size_max limits the total number of bytes made available.
 *                 Note: this value may be overshot with the size of one
 *                       segment.
 *
 * @returns the total number of bytes in the represented segments.
 *
 * @remark the write position will NOT be updated.
 */
size_t rd_buf_get_write_iov (const rd_buf_t *rbuf,
                             struct iovec *iovs, size_t *iovcntp,
                             size_t iov_max, size_t size_max) {
        const rd_segment_t *seg;
        size_t iovcnt = 0;
        size_t sum = 0;

        for (seg = rbuf->rbuf_wpos ;
             seg && iovcnt < iov_max && sum < size_max ;
             seg = TAILQ_NEXT(seg, seg_link)) {
                size_t len;
                void *p;

                len = rd_segment_write_remains(seg, &p);
                if (unlikely(len == 0))
                        continue;

                iovs[iovcnt].iov_base  = p;
                iovs[iovcnt++].iov_len = len;

                sum += len;
        }

        *iovcntp = iovcnt;

        return sum;
}











/**
 * @name Slice reader interface
 *
 * @{
 */

/**
 * @brief Initialize a new slice of \p size bytes starting at \p seg with
 *        relative offset \p rof.
 *
 * @returns 0 on success or -1 if there is not at least \p size bytes available
 *          in the buffer.
 */
int rd_slice_init_seg (rd_slice_t *slice, const rd_buf_t *rbuf,
                           const rd_segment_t *seg, size_t rof, size_t size) {
        /* Verify that \p size bytes are indeed available in the buffer. */
        if (unlikely(rbuf->rbuf_len < (seg->seg_absof + rof + size)))
                return -1;

        slice->buf    = rbuf;
        slice->seg    = seg;
        slice->rof    = rof;
        slice->start  = seg->seg_absof + rof;
        slice->end    = slice->start + size;

        rd_assert(seg->seg_absof+rof >= slice->start &&
                  seg->seg_absof+rof <= slice->end);

        rd_assert(slice->end <= rd_buf_len(rbuf));

        return 0;
}

/**
 * @brief Initialize new slice of \p size bytes starting at offset \p absof
 *
 * @returns 0 on success or -1 if there is not at least \p size bytes available
 *          in the buffer.
 */
int rd_slice_init (rd_slice_t *slice, const rd_buf_t *rbuf,
                   size_t absof, size_t size) {
        const rd_segment_t *seg = rd_buf_get_segment_at_offset(rbuf, NULL,
                                                               absof);
        if (unlikely(!seg))
                return -1;

        return rd_slice_init_seg(slice, rbuf, seg,
                                 absof - seg->seg_absof, size);
}

/**
 * @brief Initialize new slice covering the full buffer \p rbuf
 */
void rd_slice_init_full (rd_slice_t *slice, const rd_buf_t *rbuf) {
        int r = rd_slice_init(slice, rbuf, 0, rd_buf_len(rbuf));
        rd_assert(r == 0);
}



/**
 * @sa rd_slice_reader() rd_slice_peeker()
 */
size_t rd_slice_reader0 (rd_slice_t *slice, const void **p, int update_pos) {
        size_t rof = slice->rof;
        size_t rlen;
        const rd_segment_t *seg;

        /* Find segment with non-zero payload */
        for (seg = slice->seg ;
             seg && seg->seg_absof+rof < slice->end && seg->seg_of == rof ;
              seg = TAILQ_NEXT(seg, seg_link))
                rof = 0;

        if (unlikely(!seg || seg->seg_absof+rof >= slice->end))
                return 0;

        rd_assert(seg->seg_absof+rof <= slice->end);


        *p   = (const void *)(seg->seg_p + rof);
        rlen = RD_MIN(seg->seg_of - rof, rd_slice_remains(slice));

        if (update_pos) {
                if (slice->seg != seg) {
                        rd_assert(seg->seg_absof + rof >= slice->start &&
                                  seg->seg_absof + rof+rlen <= slice->end);
                        slice->seg  = seg;
                        slice->rof  = rlen;
                } else {
                        slice->rof += rlen;
                }
        }

        return rlen;
}


/**
 * @brief Convenience reader iterator interface.
 *
 *        Call repeatedly from while loop until it returns 0.
 *
 * @param slice slice to read from, position will be updated.
 * @param p will be set to the start of \p *rlenp contiguous bytes of memory
 * @param rlenp will be set to the number of bytes available in \p p
 *
 * @returns the number of bytes read, or 0 if slice is empty.
 */
size_t rd_slice_reader (rd_slice_t *slice, const void **p) {
        return rd_slice_reader0(slice, p, 1/*update_pos*/);
}

/**
 * @brief Identical to rd_slice_reader() but does NOT update the read position
 */
size_t rd_slice_peeker (const rd_slice_t *slice, const void **p) {
        return rd_slice_reader0((rd_slice_t *)slice, p, 0/*dont update_pos*/);
}





/**
 * @brief Read \p size bytes from current read position,
 *        advancing the read offset by the number of bytes copied to \p dst.
 *
 *        If there are less than \p size remaining in the buffer
 *        then 0 is returned and no bytes are copied.
 *
 * @returns \p size, or 0 if \p size bytes are not available in buffer.
 *
 * @remark This performs a complete read, no partitial reads.
 *
 * @remark If \p dst is NULL only the read position is updated.
 */
size_t rd_slice_read (rd_slice_t *slice, void *dst, size_t size) {
        size_t remains = size;
        char *d = (char *)dst; /* Possibly NULL */
        size_t rlen;
        const void *p;
        size_t orig_end = slice->end;

        if (unlikely(rd_slice_remains(slice) < size))
                return 0;

        /* Temporarily shrink slice to offset + \p size */
        slice->end = rd_slice_abs_offset(slice) + size;

        while ((rlen = rd_slice_reader(slice, &p))) {
                rd_dassert(remains >= rlen);
                if (dst) {
                        memcpy(d, p, rlen);
                        d       += rlen;
                }
                remains -= rlen;
        }

        rd_dassert(remains == 0);

        /* Restore original size */
        slice->end = orig_end;

        return size;
}


/**
 * @brief Read \p size bytes from absolute slice offset \p offset
 *        and store in \p dst, without updating the slice read position.
 *
 * @returns \p size if the offset and size was within the slice, else 0.
 */
size_t rd_slice_peek (const rd_slice_t *slice, size_t offset,
                      void *dst, size_t size) {
        rd_slice_t sub = *slice;

        if (unlikely(rd_slice_seek(&sub, offset) == -1))
                return 0;

        return rd_slice_read(&sub, dst, size);

}



/**
 * @returns a pointer to \p size contiguous bytes at the current read offset.
 *          If there isn't \p size contiguous bytes available NULL will
 *          be returned.
 *
 * @remark The read position is updated to point past \p size.
 */
const void *rd_slice_ensure_contig (rd_slice_t *slice, size_t size) {
        void *p;

        if (unlikely(rd_slice_remains(slice) < size ||
                     slice->rof + size > slice->seg->seg_of))
                return NULL;

        p = slice->seg->seg_p + slice->rof;

        rd_slice_read(slice, NULL, size);

        return p;
}



/**
 * @brief Sets the slice's read position. The offset is the slice offset,
 *        not buffer offset.
 *
 * @returns 0 if offset was within range, else -1 in which case the position
 *          is not changed.
 */
int rd_slice_seek (rd_slice_t *slice, size_t offset) {
        const rd_segment_t *seg;
        size_t absof = slice->start + offset;

        if (unlikely(absof >= slice->end))
                return -1;

        seg = rd_buf_get_segment_at_offset(slice->buf, slice->seg, absof);
        rd_assert(seg);

        slice->seg = seg;
        slice->rof = absof - seg->seg_absof;
        rd_assert(seg->seg_absof + slice->rof >= slice->start &&
                  seg->seg_absof + slice->rof <= slice->end);

        return 0;
}


/**
 * @brief Narrow the current slice to \p size, saving
 *        the original slice state info \p save_slice.
 *
 *        Use rd_slice_widen() to restore the saved slice
 *        with the read count updated from the narrowed slice.
 *
 *        This is useful for reading a sub-slice of a larger slice
 *        without having to pass the lesser length around.
 *
 * @returns 1 if enough underlying slice buffer memory is available, else 0.
 */
int rd_slice_narrow (rd_slice_t *slice, rd_slice_t *save_slice, size_t size) {
        if (unlikely(slice->start + size > slice->end))
                return 0;
        *save_slice = *slice;
        slice->end = slice->start + size;
        rd_assert(rd_slice_abs_offset(slice) <= slice->end);
        return 1;
}

/**
 * @brief Same as rd_slice_narrow() but using a relative size \p relsize
 *        from the current read position.
 */
int rd_slice_narrow_relative (rd_slice_t *slice, rd_slice_t *save_slice,
                               size_t relsize) {
        return rd_slice_narrow(slice, save_slice,
                               rd_slice_offset(slice) + relsize);
}


/**
 * @brief Restore the original \p save_slice size from a previous call to
 *        rd_slice_narrow(), while keeping the updated read pointer from
 *        \p slice.
 */
void rd_slice_widen (rd_slice_t *slice, const rd_slice_t *save_slice) {
        slice->end = save_slice->end;
}


/**
 * @brief Copy the original slice \p orig to \p new_slice and adjust
 *        the new slice length to \p size.
 *
 *        This is a side-effect free form of rd_slice_narrow() which is not to
 *        be used with rd_slice_widen().
 *
 * @returns 1 if enough underlying slice buffer memory is available, else 0.
 */
int rd_slice_narrow_copy (const rd_slice_t *orig, rd_slice_t *new_slice,
                          size_t size) {
        if (unlikely(orig->start + size > orig->end))
                return 0;
        *new_slice = *orig;
        new_slice->end = orig->start + size;
        rd_assert(rd_slice_abs_offset(new_slice) <= new_slice->end);
        return 1;
}

/**
 * @brief Same as rd_slice_narrow_copy() but with a relative size from
 *        the current read position.
 */
int rd_slice_narrow_copy_relative (const rd_slice_t *orig,
                                    rd_slice_t *new_slice,
                                    size_t relsize) {
        return rd_slice_narrow_copy(orig, new_slice,
                                    rd_slice_offset(orig) + relsize);
}





/**
 * @brief Set up the iovec \p iovs (of size \p iov_max) with the readable
 *        segments from the slice's current read position.
 *
 * @param iovcntp will be set to the number of populated \p iovs[]
 * @param size_max limits the total number of bytes made available.
 *                 Note: this value may be overshot with the size of one
 *                       segment.
 *
 * @returns the total number of bytes in the represented segments.
 *
 * @remark will NOT update the read position.
 */
size_t rd_slice_get_iov (const rd_slice_t *slice,
                         struct iovec *iovs, size_t *iovcntp,
                         size_t iov_max, size_t size_max) {
        const void *p;
        size_t rlen;
        size_t iovcnt = 0;
        size_t sum = 0;
        rd_slice_t copy = *slice; /* Use a copy of the slice so we dont
                                   * update the position for the caller. */

        while (sum < size_max && iovcnt < iov_max &&
               (rlen = rd_slice_reader(&copy, &p))) {
                iovs[iovcnt].iov_base  = (void *)p;
                iovs[iovcnt++].iov_len = rlen;

                sum += rlen;
        }

        *iovcntp = iovcnt;

        return sum;
}





/**
 * @brief CRC32 calculation of slice.
 *
 * @returns the calculated CRC
 *
 * @remark the slice's position is updated.
 */
uint32_t rd_slice_crc32 (rd_slice_t *slice) {
        rd_crc32_t crc;
        const void *p;
        size_t rlen;

        crc = rd_crc32_init();

        while ((rlen = rd_slice_reader(slice, &p)))
                crc = rd_crc32_update(crc, p, rlen);

        return (uint32_t)rd_crc32_finalize(crc);
}

/**
 * @brief Compute CRC-32C of segments starting at at buffer position \p absof,
 *        also supporting the case where the position/offset is not at the
 *        start of the first segment.
 *
 * @remark the slice's position is updated.
 */
uint32_t rd_slice_crc32c (rd_slice_t *slice) {
        const void *p;
        size_t rlen;
        uint32_t crc = 0;

        while ((rlen = rd_slice_reader(slice, &p)))
                crc = crc32c(crc, (const char *)p, rlen);

        return crc;
}





/**
 * @name Debugging dumpers
 *
 *
 */

static void rd_segment_dump (const rd_segment_t *seg, const char *ind,
                             size_t relof, int do_hexdump) {
        fprintf(stderr,
                "%s((rd_segment_t *)%p): "
                "p %p, of %"PRIusz", "
                "absof %"PRIusz", size %"PRIusz", free %p, flags 0x%x\n",
                ind, seg, seg->seg_p, seg->seg_of,
                seg->seg_absof, seg->seg_size, seg->seg_free, seg->seg_flags);
        rd_assert(relof <= seg->seg_of);
        if (do_hexdump)
                rd_hexdump(stderr, "segment",
                           seg->seg_p+relof, seg->seg_of-relof);
}

void rd_buf_dump (const rd_buf_t *rbuf, int do_hexdump) {
        const rd_segment_t *seg;

        fprintf(stderr,
                "((rd_buf_t *)%p):\n"
                " len %"PRIusz" size %"PRIusz
                ", %"PRIusz"/%"PRIusz" extra memory used\n",
                rbuf, rbuf->rbuf_len, rbuf->rbuf_size,
                rbuf->rbuf_extra_len, rbuf->rbuf_extra_size);

        if (rbuf->rbuf_wpos) {
                fprintf(stderr, " wpos:\n");
                rd_segment_dump(rbuf->rbuf_wpos, "  ", 0, 0);
        }

        if (rbuf->rbuf_segment_cnt > 0) {
                size_t segcnt = 0;

                fprintf(stderr, " %"PRIusz" linked segments:\n",
                        rbuf->rbuf_segment_cnt);
                TAILQ_FOREACH(seg, &rbuf->rbuf_segments, seg_link) {
                        rd_segment_dump(seg, "  ", 0, do_hexdump);
                        rd_assert(++segcnt <= rbuf->rbuf_segment_cnt);
                }
        }
}

void rd_slice_dump (const rd_slice_t *slice, int do_hexdump) {
        const rd_segment_t *seg;
        size_t relof;

        fprintf(stderr,
                "((rd_slice_t *)%p):\n"
                "  buf %p (len %"PRIusz"), seg %p (absof %"PRIusz"), "
                "rof %"PRIusz", start %"PRIusz", end %"PRIusz", size %"PRIusz
                ", offset %"PRIusz"\n",
                slice, slice->buf, rd_buf_len(slice->buf),
                slice->seg, slice->seg ? slice->seg->seg_absof : 0,
                slice->rof, slice->start, slice->end,
                rd_slice_size(slice), rd_slice_offset(slice));
        relof = slice->rof;

        for (seg = slice->seg ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
                rd_segment_dump(seg, "  ", relof, do_hexdump);
                relof = 0;
        }
}


/**
 * @name Unit-tests
 *
 *
 *
 */


/**
 * @brief Basic write+read test
 */
static int do_unittest_write_read (void) {
        rd_buf_t b;
        char ones[1024];
        char twos[1024];
        char threes[1024];
        char fiftyfives[100]; /* 0x55 indicates "untouched" memory */
        char buf[1024*3];
        rd_slice_t slice;
        size_t r, pos;

        memset(ones, 0x1, sizeof(ones));
        memset(twos, 0x2, sizeof(twos));
        memset(threes, 0x3, sizeof(threes));
        memset(fiftyfives, 0x55, sizeof(fiftyfives));
        memset(buf, 0x55, sizeof(buf));

        rd_buf_init(&b, 2, 1000);

        /*
         * Verify write
         */
        r = rd_buf_write(&b, ones, 200);
        RD_UT_ASSERT(r == 0, "write() returned position %"PRIusz, r);
        pos = rd_buf_write_pos(&b);
        RD_UT_ASSERT(pos == 200, "pos() returned position %"PRIusz, pos);

        r = rd_buf_write(&b, twos, 800);
        RD_UT_ASSERT(pos == 200, "write() returned position %"PRIusz, r);
        pos = rd_buf_write_pos(&b);
        RD_UT_ASSERT(pos == 200+800, "pos() returned position %"PRIusz, pos);

        /* Buffer grows here */
        r = rd_buf_write(&b, threes, 1);
        RD_UT_ASSERT(pos == 200+800,
                     "write() returned position %"PRIusz, r);
        pos = rd_buf_write_pos(&b);
        RD_UT_ASSERT(pos == 200+800+1, "pos() returned position %"PRIusz, pos);

        /*
         * Verify read
         */
        /* Get full slice. */
        rd_slice_init_full(&slice, &b);

        r = rd_slice_read(&slice, buf, 200+800+2);
        RD_UT_ASSERT(r == 0,
                     "read() > remaining should have failed, gave %"PRIusz, r);
        r = rd_slice_read(&slice, buf, 200+800+1);
        RD_UT_ASSERT(r == 200+800+1,
                     "read() returned %"PRIusz" (%"PRIusz" remains)",
                     r, rd_slice_remains(&slice));

        RD_UT_ASSERT(!memcmp(buf, ones, 200), "verify ones");
        RD_UT_ASSERT(!memcmp(buf+200, twos, 800), "verify twos");
        RD_UT_ASSERT(!memcmp(buf+200+800, threes, 1), "verify threes");
        RD_UT_ASSERT(!memcmp(buf+200+800+1, fiftyfives, 100), "verify 55s");

        rd_buf_destroy(&b);

        RD_UT_PASS();
}


/**
 * @brief Helper read verifier, not a unit-test itself.
 */
#define do_unittest_read_verify(b,absof,len,verify) do {                \
                int __fail = do_unittest_read_verify0(b,absof,len,verify); \
                RD_UT_ASSERT(!__fail,                                   \
                             "read_verify(absof=%"PRIusz",len=%"PRIusz") " \
                             "failed", (size_t)absof, (size_t)len);     \
        } while (0)

static int
do_unittest_read_verify0 (const rd_buf_t *b, size_t absof, size_t len,
                          const char *verify) {
        rd_slice_t slice, sub;
        char buf[1024];
        size_t half;
        size_t r;
        int i;

        rd_assert(sizeof(buf) >= len);

        /* Get reader slice */
        i = rd_slice_init(&slice, b, absof, len);
        RD_UT_ASSERT(i == 0, "slice_init() failed: %d", i);

        r = rd_slice_read(&slice, buf, len);
        RD_UT_ASSERT(r == len,
                     "read() returned %"PRIusz" expected %"PRIusz
                     " (%"PRIusz" remains)",
                     r, len, rd_slice_remains(&slice));

        RD_UT_ASSERT(!memcmp(buf, verify, len), "verify");

        r = rd_slice_offset(&slice);
        RD_UT_ASSERT(r == len, "offset() returned %"PRIusz", not %"PRIusz,
                     r, len);

        half = len / 2;
        i = rd_slice_seek(&slice, half);
        RD_UT_ASSERT(i == 0, "seek(%"PRIusz") returned %d", half, i);
        r = rd_slice_offset(&slice);
        RD_UT_ASSERT(r == half, "offset() returned %"PRIusz", not %"PRIusz,
                     r, half);

        /* Get a sub-slice covering the later half. */
        sub = rd_slice_pos(&slice);
        r = rd_slice_offset(&sub);
        RD_UT_ASSERT(r == 0, "sub: offset() returned %"PRIusz", not %"PRIusz,
                     r, (size_t)0);
        r = rd_slice_size(&sub);
        RD_UT_ASSERT(r == half, "sub: size() returned %"PRIusz", not %"PRIusz,
                     r, half);
        r = rd_slice_remains(&sub);
        RD_UT_ASSERT(r == half,
                     "sub: remains() returned %"PRIusz", not %"PRIusz,
                     r, half);

        /* Read half */
        r = rd_slice_read(&sub, buf, half);
        RD_UT_ASSERT(r == half,
                     "sub read() returned %"PRIusz" expected %"PRIusz
                     " (%"PRIusz" remains)",
                     r, len, rd_slice_remains(&sub));

        RD_UT_ASSERT(!memcmp(buf, verify, len), "verify");

        r = rd_slice_offset(&sub);
        RD_UT_ASSERT(r == rd_slice_size(&sub),
                     "sub offset() returned %"PRIusz", not %"PRIusz,
                     r, rd_slice_size(&sub));
        r = rd_slice_remains(&sub);
        RD_UT_ASSERT(r == 0,
                     "sub: remains() returned %"PRIusz", not %"PRIusz,
                     r, (size_t)0);

        return 0;
}


/**
 * @brief write_seek() and split() test
 */
static int do_unittest_write_split_seek (void) {
        rd_buf_t b;
        char ones[1024];
        char twos[1024];
        char threes[1024];
        char fiftyfives[100]; /* 0x55 indicates "untouched" memory */
        char buf[1024*3];
        size_t r, pos;
        rd_segment_t *seg, *newseg;

        memset(ones, 0x1, sizeof(ones));
        memset(twos, 0x2, sizeof(twos));
        memset(threes, 0x3, sizeof(threes));
        memset(fiftyfives, 0x55, sizeof(fiftyfives));
        memset(buf, 0x55, sizeof(buf));

        rd_buf_init(&b, 0, 0);

        /*
         * Verify write
         */
        r = rd_buf_write(&b, ones, 400);
        RD_UT_ASSERT(r == 0, "write() returned position %"PRIusz, r);
        pos = rd_buf_write_pos(&b);
        RD_UT_ASSERT(pos == 400, "pos() returned position %"PRIusz, pos);

        do_unittest_read_verify(&b, 0, 400, ones);

        /*
         * Seek and re-write
         */
        r = rd_buf_write_seek(&b, 200);
        RD_UT_ASSERT(r == 0, "seek() failed");
        pos = rd_buf_write_pos(&b);
        RD_UT_ASSERT(pos == 200, "pos() returned position %"PRIusz, pos);

        r = rd_buf_write(&b, twos, 100);
        RD_UT_ASSERT(pos == 200, "write() returned position %"PRIusz, r);
        pos = rd_buf_write_pos(&b);
        RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);

        do_unittest_read_verify(&b, 0, 200, ones);
        do_unittest_read_verify(&b, 200, 100, twos);

        /* Make sure read() did not modify the write position. */
        pos = rd_buf_write_pos(&b);
        RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);

        /* Split buffer, write position is now at split where writes
        * are not allowed (mid buffer). */
        seg = rd_buf_get_segment_at_offset(&b, NULL, 50);
        RD_UT_ASSERT(seg->seg_of != 0, "assumed mid-segment");
        newseg = rd_segment_split(&b, seg, 50);
        rd_buf_append_segment(&b, newseg);
        seg = rd_buf_get_segment_at_offset(&b, NULL, 50);
        RD_UT_ASSERT(seg != NULL, "seg");
        RD_UT_ASSERT(seg == newseg, "newseg %p, seg %p", newseg, seg);
        RD_UT_ASSERT(seg->seg_of > 0,
                     "assumed beginning of segment, got %"PRIusz, seg->seg_of);

        pos = rd_buf_write_pos(&b);
        RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);

        /* Re-verify that nothing changed */
        do_unittest_read_verify(&b, 0, 200, ones);
        do_unittest_read_verify(&b, 200, 100, twos);

        /* Do a write seek at buffer boundary, sub-sequent buffers should
         * be destroyed. */
        r = rd_buf_write_seek(&b, 50);
        RD_UT_ASSERT(r == 0, "seek() failed");
        do_unittest_read_verify(&b, 0, 50, ones);

        rd_buf_destroy(&b);

        RD_UT_PASS();
}

/**
 * @brief Unittest to verify payload is correctly written and read.
 *        Each written u32 word is the running CRC of the word count.
 */
static int do_unittest_write_read_payload_correctness (void) {
        uint32_t crc;
        uint32_t write_crc, read_crc;
        const int seed = 12345;
        rd_buf_t b;
        const size_t max_cnt = 20000;
        rd_slice_t slice;
        size_t r;
        size_t i;
        int pass;

        crc = rd_crc32_init();
        crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed));

        rd_buf_init(&b, 0, 0);
        for (i = 0 ; i < max_cnt ; i++) {
                crc = rd_crc32_update(crc, (void *)&i, sizeof(i));
                rd_buf_write(&b, &crc, sizeof(crc));
        }

        write_crc = rd_crc32_finalize(crc);

        r = rd_buf_len(&b);
        RD_UT_ASSERT(r == max_cnt * sizeof(crc),
                     "expected length %"PRIusz", not %"PRIusz,
                     r, max_cnt * sizeof(crc));

        /*
         * Now verify the contents with a reader.
         */
        rd_slice_init_full(&slice, &b);

        r = rd_slice_remains(&slice);
        RD_UT_ASSERT(r == rd_buf_len(&b),
                     "slice remains %"PRIusz", should be %"PRIusz,
                     r, rd_buf_len(&b));

        for (pass = 0 ; pass < 2 ; pass++) {
                /* Two passes:
                 *  - pass 1: using peek()
                 *  - pass 2: using read()
                 */
                const char *pass_str = pass == 0 ? "peek":"read";

                crc = rd_crc32_init();
                crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed));

                for (i = 0 ; i < max_cnt ; i++) {
                        uint32_t buf_crc;

                        crc = rd_crc32_update(crc, (void *)&i, sizeof(&i));

                        if (pass == 0)
                                r = rd_slice_peek(&slice, i * sizeof(buf_crc),
                                                  &buf_crc, sizeof(buf_crc));
                        else
                                r = rd_slice_read(&slice, &buf_crc,
                                                  sizeof(buf_crc));
                        RD_UT_ASSERT(r == sizeof(buf_crc),
                                     "%s() at #%"PRIusz" failed: "
                                     "r is %"PRIusz" not %"PRIusz,
                                     pass_str, i, r, sizeof(buf_crc));
                        RD_UT_ASSERT(buf_crc == crc,
                                     "%s: invalid crc at #%"PRIusz
                                     ": expected %"PRIu32", read %"PRIu32,
                                     pass_str, i, crc, buf_crc);
                }

                read_crc = rd_crc32_finalize(crc);

                RD_UT_ASSERT(read_crc == write_crc,
                             "%s: finalized read crc %"PRIu32
                             " != write crc %"PRIu32,
                             pass_str, read_crc, write_crc);

        }

        r = rd_slice_remains(&slice);
        RD_UT_ASSERT(r == 0,
                     "slice remains %"PRIusz", should be %"PRIusz,
                     r, (size_t)0);

        rd_buf_destroy(&b);

        RD_UT_PASS();
}

#define do_unittest_iov_verify(...) do {                                \
                int __fail = do_unittest_iov_verify0(__VA_ARGS__);      \
                RD_UT_ASSERT(!__fail, "iov_verify() failed");           \
        } while (0)
static int do_unittest_iov_verify0 (rd_buf_t *b,
                                    size_t exp_iovcnt, size_t exp_totsize) {
        #define MY_IOV_MAX 16
        struct iovec iov[MY_IOV_MAX];
        size_t iovcnt;
        size_t i;
        size_t totsize, sum;

        rd_assert(exp_iovcnt <= MY_IOV_MAX);

        totsize = rd_buf_get_write_iov(b, iov, &iovcnt, MY_IOV_MAX, exp_totsize);
        RD_UT_ASSERT(totsize >= exp_totsize,
                     "iov total size %"PRIusz" expected >= %"PRIusz,
                     totsize, exp_totsize);
        RD_UT_ASSERT(iovcnt >= exp_iovcnt && iovcnt <= MY_IOV_MAX,
                     "iovcnt %"PRIusz
                     ", expected %"PRIusz" < x <= MY_IOV_MAX",
                     iovcnt, exp_iovcnt);

        sum = 0;
        for (i = 0 ; i < iovcnt ; i++) {
                RD_UT_ASSERT(iov[i].iov_base,
                             "iov #%"PRIusz" iov_base not set", i);
                RD_UT_ASSERT(iov[i].iov_len,
                             "iov #%"PRIusz" iov_len %"PRIusz" out of range",
                             i, iov[i].iov_len);
                sum += iov[i].iov_len;
                RD_UT_ASSERT(sum <= totsize, "sum %"PRIusz" > totsize %"PRIusz,
                             sum, totsize);
        }

        RD_UT_ASSERT(sum == totsize,
                     "sum %"PRIusz" != totsize %"PRIusz,
                     sum, totsize);

        return 0;
}


/**
 * @brief Verify that buffer to iovec conversion works.
 */
static int do_unittest_write_iov (void) {
        rd_buf_t b;

        rd_buf_init(&b, 0, 0);
        rd_buf_write_ensure(&b, 100, 100);

        do_unittest_iov_verify(&b, 1, 100);

        /* Add a secondary buffer */
        rd_buf_write_ensure(&b, 30000, 0);

        do_unittest_iov_verify(&b, 2, 100+30000);


        rd_buf_destroy(&b);

        RD_UT_PASS();
}


int unittest_rdbuf (void) {
        int fails = 0;

        fails += do_unittest_write_read();
        fails += do_unittest_write_split_seek();
        fails += do_unittest_write_read_payload_correctness();
        fails += do_unittest_write_iov();

        return fails;
}