/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.h"
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */
/**
* Basic compression tests, with rather lacking verification.
*/
int main_0017_compression(int argc, char **argv) {
rd_kafka_t *rk_p, *rk_c;
const int msg_cnt = 1000;
int msg_base = 0;
uint64_t testid;
#define CODEC_CNT 4
const char *codecs[CODEC_CNT+1] = {
"none",
#if WITH_ZLIB
"gzip",
#endif
#if WITH_SNAPPY
"snappy",
#endif
"lz4",
NULL
};
const char *topics[CODEC_CNT];
const int32_t partition = 0;
int i;
int crc;
testid = test_id_generate();
/* Produce messages */
rk_p = test_create_producer();
for (i = 0; codecs[i] != NULL ; i++) {
rd_kafka_topic_t *rkt_p;
topics[i] = test_mk_topic_name(codecs[i], 1);
TEST_SAY("Produce %d messages with %s compression to "
"topic %s\n",
msg_cnt, codecs[i], topics[i]);
rkt_p = test_create_producer_topic(rk_p, topics[i],
"compression.codec", codecs[i], NULL);
/* Produce small message that will not decrease with
* compression (issue #781) */
test_produce_msgs(rk_p, rkt_p, testid, partition,
msg_base + (partition*msg_cnt), 1,
NULL, 5);
/* Produce standard sized messages */
test_produce_msgs(rk_p, rkt_p, testid, partition,
msg_base + (partition*msg_cnt) + 1, msg_cnt-1,
NULL, 512);
rd_kafka_topic_destroy(rkt_p);
}
rd_kafka_destroy(rk_p);
/* restart timeout (mainly for helgrind use since it is very slow) */
test_timeout_set(30);
/* Consume messages: Without and with CRC checking */
for (crc = 0 ; crc < 2 ; crc++) {
const char *crc_tof = crc ? "true":"false";
rd_kafka_conf_t *conf;
test_conf_init(&conf, NULL, 0);
test_conf_set(conf, "check.crcs", crc_tof);
rk_c = test_create_consumer(NULL, NULL, conf, NULL);
for (i = 0; codecs[i] != NULL ; i++) {
rd_kafka_topic_t *rkt_c = rd_kafka_topic_new(rk_c,
topics[i],
NULL);
TEST_SAY("Consume %d messages from topic %s (crc=%s)\n",
msg_cnt, topics[i], crc_tof);
/* Start consuming */
test_consumer_start(codecs[i], rkt_c, partition,
RD_KAFKA_OFFSET_BEGINNING);
/* Consume messages */
test_consume_msgs(
codecs[i], rkt_c, testid, partition,
/* Use offset 0 here, which is wrong, should
* be TEST_NO_SEEK, but it exposed a bug
* where the Offset query was postponed
* till after the seek, causing messages
* to be replayed. */
0,
msg_base, msg_cnt, 1 /* parse format */);
test_consumer_stop(codecs[i], rkt_c, partition);
rd_kafka_topic_destroy(rkt_c);
}
rd_kafka_destroy(rk_c);
}
return 0;
}