/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.h"
#include "rdkafka.h"
/**
* @brief Verify handling of compacted topics.
*
* General idea:
* - create a compacted topic with a low cleanup interval to promote quick
* compaction.
* - produce messages for 3 keys and interleave with unkeyed messages.
* interleave tombstones for k1 and k2, but not k3.
* - consume before compaction - verify all messages in place
* - wait for compaction
* - consume after compaction - verify expected messages.
*/
/**
* @brief Get low watermark in partition, we use this see if compaction
* has kicked in.
*/
static int64_t get_low_wmark (rd_kafka_t *rk, const char *topic,
int32_t partition) {
rd_kafka_resp_err_t err;
int64_t low, high;
err = rd_kafka_query_watermark_offsets(rk, topic, partition,
&low, &high,
tmout_multip(10000));
TEST_ASSERT(!err, "query_warmark_offsets(%s, %d) failed: %s",
topic, (int)partition, rd_kafka_err2str(err));
return low;
}
/**
* @brief Wait for compaction by checking for
* partition low-watermark increasing */
static void wait_compaction (rd_kafka_t *rk,
const char *topic, int32_t partition,
int64_t low_offset,
int timeout_ms) {
int64_t low = -1;
int64_t ts_start = test_clock();
TEST_SAY("Waiting for compaction to kick in and increase the "
"Low watermark offset from %"PRId64" on %s [%"PRId32"]\n",
low_offset, topic, partition);
while (1) {
low = get_low_wmark(rk, topic, partition);
TEST_SAY("Low watermark offset for %s [%"PRId32"] is "
"%"PRId64" (want > %"PRId64")\n",
topic, partition, low, low_offset);
if (low > low_offset)
break;
if (ts_start + (timeout_ms * 1000) < test_clock())
break;
rd_sleep(5);
}
}
static void produce_compactable_msgs (const char *topic, int32_t partition,
uint64_t testid,
int msgcnt, size_t msgsize) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
int i;
char *val;
char key[16];
rd_kafka_resp_err_t err;
int msgcounter = msgcnt;
if (!testid)
testid = test_id_generate();
test_str_id_generate(key, sizeof(key));
val = calloc(1, msgsize);
TEST_SAY("Producing %d messages (total of %"PRIusz" bytes) of "
"compactable messages\n", msgcnt, (size_t)msgcnt*msgsize);
test_conf_init(&conf, NULL, 0);
rd_kafka_conf_set_dr_cb(conf, test_dr_cb);
/* Make sure batch size does not exceed segment.bytes since that
* will make the ProduceRequest fail. */
test_conf_set(conf, "batch.num.messages", "1");
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
for (i = 0 ; i < msgcnt-1 ; i++) {
err = rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_KEY(key, sizeof(key)-1),
RD_KAFKA_V_VALUE(val, msgsize),
RD_KAFKA_V_OPAQUE(&msgcounter),
RD_KAFKA_V_END);
TEST_ASSERT(!err, "producev(): %s", rd_kafka_err2str(err));
}
/* Final message is the tombstone */
err = rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_KEY(key, sizeof(key)-1),
RD_KAFKA_V_OPAQUE(&msgcounter),
RD_KAFKA_V_END);
TEST_ASSERT(!err, "producev(): %s", rd_kafka_err2str(err));
test_flush(rk, tmout_multip(10000));
TEST_ASSERT(msgcounter == 0, "%d messages unaccounted for", msgcounter);
rd_kafka_destroy(rk);
free(val);
}
static void do_test_compaction (int msgs_per_key, const char *compression) {
const char *topic = test_mk_topic_name(__FILE__, 1);
#define _KEY_CNT 4
const char *keys[_KEY_CNT] = { "k1", "k2", "k3", NULL/*generate unique*/ };
int msgcnt = msgs_per_key * _KEY_CNT;
rd_kafka_conf_t *conf;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
uint64_t testid;
int32_t partition = 0;
int cnt = 0;
test_msgver_t mv;
test_msgver_t mv_correct;
int msgcounter = 0;
const int fillcnt = 20;
testid = test_id_generate();
TEST_SAY(_C_MAG "Test compaction on topic %s with %s compression (%d messages)\n",
topic, compression ? compression : "no", msgcnt);
test_kafka_topics("--create --topic \"%s\" "
"--partitions %d "
"--replication-factor 1 "
"--config cleanup.policy=compact "
"--config segment.ms=10000 "
"--config segment.bytes=10000 "
"--config min.cleanable.dirty.ratio=0.01 "
"--config delete.retention.ms=86400 "
"--config file.delete.delay.ms=10000",
topic, partition+1);
test_conf_init(&conf, NULL, 120);
rd_kafka_conf_set_dr_cb(conf, test_dr_cb);
if (compression)
test_conf_set(conf, "compression.codec", compression);
/* Limit max batch size below segment.bytes to avoid messages
* to accumulate into a batch that will be rejected by the broker. */
test_conf_set(conf, "message.max.bytes", "6000");
test_conf_set(conf, "linger.ms", "10");
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
rkt = rd_kafka_topic_new(rk, topic, NULL);
/* The low watermark is not updated on message deletion(compaction)
* but on segment deletion, so fill up the first segment with
* random messages eligible for hasty compaction. */
produce_compactable_msgs(topic, 0, partition, fillcnt, 1000);
/* Populate a correct msgver for later comparison after compact. */
test_msgver_init(&mv_correct, testid);
TEST_SAY("Producing %d messages for %d keys\n", msgcnt, _KEY_CNT);
for (cnt = 0 ; cnt < msgcnt ; ) {
int k;
for (k = 0 ; k < _KEY_CNT ; k++) {
rd_kafka_resp_err_t err;
int is_last = cnt + _KEY_CNT >= msgcnt;
/* Let keys[0] have some tombstones */
int is_tombstone = (k == 0 && (is_last || !(cnt % 7)));
char *valp;
size_t valsize;
char rdk_msgid[256];
char unique_key[16];
const void *key;
size_t keysize;
int64_t offset = fillcnt + cnt;
test_msg_fmt(rdk_msgid, sizeof(rdk_msgid),
testid, partition, cnt);
if (is_tombstone) {
valp = NULL;
valsize = 0;
} else {
valp = rdk_msgid;
valsize = strlen(valp);
}
if (!(key = keys[k])) {
rd_snprintf(unique_key, sizeof(unique_key),
"%d", cnt);
key = unique_key;
}
keysize = strlen(key);
/* All unique-key messages should remain intact
* after compaction. */
if (!keys[k] || is_last) {
TEST_SAYL(4,
"Add to correct msgvec: "
"msgid: %d: %s is_last=%d, "
"is_tomb=%d\n",
cnt, (const char *)key,
is_last, is_tombstone);
test_msgver_add_msg00(__FUNCTION__, __LINE__,
&mv_correct, testid,
topic, partition,
offset, -1, 0, cnt);
}
msgcounter++;
err = rd_kafka_producev(
rk,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_KEY(key, keysize),
RD_KAFKA_V_VALUE(valp, valsize),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_HEADER("rdk_msgid", rdk_msgid, -1),
/* msgcounter as msg_opaque is used
* by test delivery report callback to
* count number of messages. */
RD_KAFKA_V_OPAQUE(&msgcounter),
RD_KAFKA_V_END);
TEST_ASSERT(!err, "producev(#%d) failed: %s",
cnt, rd_kafka_err2str(err));
cnt++;
}
}
TEST_ASSERT(cnt == msgcnt, "cnt %d != msgcnt %d", cnt, msgcnt);
msgcounter = cnt;
test_wait_delivery(rk, &msgcounter);
/* Trigger compaction by filling up the segment with dummy messages,
* do it in chunks to avoid too good compression which then won't
* fill up the segments..
* We can't reuse the existing producer instance because it
* might be using compression which makes it hard to know how
* much data we need to produce to trigger compaction. */
produce_compactable_msgs(topic, 0, partition, 20, 1024);
/* Wait for compaction:
* this doesn't really work because the low watermark offset
* is not updated on compaction if the first segment is not deleted.
* But it serves as a pause to let compaction kick in
* which is triggered by the dummy produce above. */
wait_compaction(rk, topic, partition, 0, 20*1000);
TEST_SAY(_C_YEL "Verify messages after compaction\n");
/* After compaction we expect the following messages:
* last message for each of k1, k2, k3, all messages for unkeyed. */
test_msgver_init(&mv, testid);
mv.msgid_hdr = "rdk_msgid";
test_consume_msgs_easy_mv(NULL, topic, testid, 1, -1, NULL, &mv);
test_msgver_verify_compare("post-compaction", &mv, &mv_correct,
TEST_MSGVER_BY_MSGID|TEST_MSGVER_BY_OFFSET);
test_msgver_clear(&mv);
test_msgver_clear(&mv_correct);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
TEST_SAY(_C_GRN "Compaction test with %s compression: PASS\n",
compression ? compression : "no");
}
int main_0077_compaction (int argc, char **argv) {
if (!test_can_create_topics(1))
return 0;
do_test_compaction(10, NULL);
do_test_compaction(1000, NULL);
#if WITH_SNAPPY
do_test_compaction(10, "snappy");
#endif
#if WITH_ZLIB
do_test_compaction(10000, "gzip");
#endif
return 0;
}