Blob Blame History Raw
/*
 * librdkafka - The Apache Kafka C/C++ library
 *
 * Copyright (c) 2017 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "rdkafka_int.h"
#include "rdkafka_header.h"



#define rd_kafka_header_destroy        rd_free

void rd_kafka_headers_destroy (rd_kafka_headers_t *hdrs) {
        rd_list_destroy(&hdrs->rkhdrs_list);
        rd_free(hdrs);
}

rd_kafka_headers_t *rd_kafka_headers_new (size_t initial_count) {
        rd_kafka_headers_t *hdrs;

        hdrs = rd_malloc(sizeof(*hdrs));
        rd_list_init(&hdrs->rkhdrs_list, (int)initial_count,
                     rd_kafka_header_destroy);
        hdrs->rkhdrs_ser_size = 0;

        return hdrs;
}

static void *rd_kafka_header_copy (const void *_src, void *opaque) {
        rd_kafka_headers_t *hdrs = opaque;
        const rd_kafka_header_t *src = (const rd_kafka_header_t *)_src;

        return (void *)rd_kafka_header_add(
                hdrs,
                src->rkhdr_name, src->rkhdr_name_size,
                src->rkhdr_value, src->rkhdr_value_size);
}

rd_kafka_headers_t *
rd_kafka_headers_copy (const rd_kafka_headers_t *src) {
        rd_kafka_headers_t *dst;

        dst = rd_malloc(sizeof(*dst));
        rd_list_init(&dst->rkhdrs_list, rd_list_cnt(&src->rkhdrs_list),
                     rd_kafka_header_destroy);
        dst->rkhdrs_ser_size = 0; /* Updated by header_copy() */
        rd_list_copy_to(&dst->rkhdrs_list, &src->rkhdrs_list,
                        rd_kafka_header_copy, dst);

        return dst;
}



rd_kafka_resp_err_t
rd_kafka_header_add (rd_kafka_headers_t *hdrs,
                     const char *name, ssize_t name_size,
                     const void *value, ssize_t value_size) {
        rd_kafka_header_t *hdr;
        char varint_NameLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
        char varint_ValueLen[RD_UVARINT_ENC_SIZEOF(int32_t)];

        if (name_size == -1)
                name_size = strlen(name);

        if (value_size == -1)
                value_size = value ? strlen(value) : 0;
        else if (!value)
                value_size = 0;

        hdr = rd_malloc(sizeof(*hdr) + name_size + 1 + value_size + 1);
        hdr->rkhdr_name_size = name_size;
        memcpy((void *)hdr->rkhdr_name, name, name_size);
        hdr->rkhdr_name[name_size] = '\0';

        if (likely(value != NULL)) {
                hdr->rkhdr_value = hdr->rkhdr_name+name_size+1;
                memcpy((void *)hdr->rkhdr_value, value, value_size);
                hdr->rkhdr_value[value_size] = '\0';
                hdr->rkhdr_value_size        = value_size;
        } else {
                hdr->rkhdr_value      = NULL;
                hdr->rkhdr_value_size = 0;
        }

        rd_list_add(&hdrs->rkhdrs_list, hdr);

        /* Calculate serialized size of header */
        hdr->rkhdr_ser_size = name_size + value_size;
        hdr->rkhdr_ser_size += rd_uvarint_enc_i64(varint_NameLen,
                                                  sizeof(varint_NameLen),
                                                  name_size);
        hdr->rkhdr_ser_size += rd_uvarint_enc_i64(varint_ValueLen,
                                                  sizeof(varint_ValueLen),
                                                  value_size);
        hdrs->rkhdrs_ser_size += hdr->rkhdr_ser_size;

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * @brief header_t(name) to char * comparator
 */
static int rd_kafka_header_cmp_str (void *_a, void *_b) {
        const rd_kafka_header_t *a = _a;
        const char *b = _b;

        return strcmp(a->rkhdr_name, b);
}

rd_kafka_resp_err_t rd_kafka_header_remove (rd_kafka_headers_t *hdrs,
                                            const char *name) {
        size_t ser_size = 0;
        rd_kafka_header_t *hdr;
        int i;

        RD_LIST_FOREACH_REVERSE(hdr, &hdrs->rkhdrs_list, i) {
                if (rd_kafka_header_cmp_str(hdr, (void *)name))
                        continue;

                ser_size += hdr->rkhdr_ser_size;
                rd_list_remove_elem(&hdrs->rkhdrs_list, i);
                rd_kafka_header_destroy(hdr);
        }

        if (ser_size == 0)
                return RD_KAFKA_RESP_ERR__NOENT;

        rd_dassert(hdrs->rkhdrs_ser_size >= ser_size);
        hdrs->rkhdrs_ser_size -= ser_size;

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}

rd_kafka_resp_err_t
rd_kafka_header_get_last (const rd_kafka_headers_t *hdrs,
                          const char *name,
                          const void **valuep, size_t *sizep) {
        const rd_kafka_header_t *hdr;
        int i;
        size_t name_size = strlen(name);

        RD_LIST_FOREACH_REVERSE(hdr, &hdrs->rkhdrs_list, i) {
                if (hdr->rkhdr_name_size == name_size &&
                    !strcmp(hdr->rkhdr_name, name)) {
                        *valuep = hdr->rkhdr_value;
                        *sizep = hdr->rkhdr_value_size;
                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                }
        }

        return RD_KAFKA_RESP_ERR__NOENT;
}


rd_kafka_resp_err_t
rd_kafka_header_get (const rd_kafka_headers_t *hdrs, size_t idx,
                     const char *name,
                     const void **valuep, size_t *sizep) {
        const rd_kafka_header_t *hdr;
        int i;
        size_t mi = 0; /* index for matching names */
        size_t name_size = strlen(name);

        RD_LIST_FOREACH(hdr, &hdrs->rkhdrs_list, i) {
                if (hdr->rkhdr_name_size == name_size &&
                    !strcmp(hdr->rkhdr_name, name) &&
                    mi++ == idx) {
                        *valuep = hdr->rkhdr_value;
                        *sizep = hdr->rkhdr_value_size;
                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                }
        }

        return RD_KAFKA_RESP_ERR__NOENT;
}


rd_kafka_resp_err_t
rd_kafka_header_get_all (const rd_kafka_headers_t *hdrs, size_t idx,
                         const char **namep,
                         const void **valuep, size_t *sizep) {
        const rd_kafka_header_t *hdr;

        hdr = rd_list_elem(&hdrs->rkhdrs_list, (int)idx);
        if (unlikely(!hdr))
                return RD_KAFKA_RESP_ERR__NOENT;

        *namep  = hdr->rkhdr_name;
        *valuep = hdr->rkhdr_value;
        *sizep  = hdr->rkhdr_value_size;
        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


size_t rd_kafka_header_cnt(const rd_kafka_headers_t *hdrs) {
        return (size_t)rd_list_cnt(&hdrs->rkhdrs_list);
}