Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2014, Magnus Edenhill
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * - Generate unique topic name (there is a C function for that in test.h wihch you should use)
 * - Query metadata for that topic
 * - Wait one second
 * - Query again, it should now have isrs and everything
 * Note: The test require auto.create.topics.enable = true in kafka server properties.
 */


#define _GNU_SOURCE
#include <sys/time.h>
#include <time.h>
#include <string>
#include <sstream>
#include <iostream>


extern "C" {
#include "test.h"
}

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is built from within the librdkafka source tree and thus differs. */
#include "rdkafkacpp.h"  /* for Kafka driver */

/**
 * Generate unique topic name (there is a C function for that in test.h wihch you should use)
 * Query metadata for that topic
 * Wait one second
 * Query again, it should now have isrs and everything
 */
static void test_metadata_cpp (void) {
	RdKafka::Conf *conf  = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); /* @TODO: Do we need to merge with C test_conf_init()? */
	RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /* @TODO: Same of prev */

	RdKafka::Metadata *metadata;
	RdKafka::ErrorCode err;
	int msgcnt = 10000;
	int partition_cnt = 2;
	int i;
	uint64_t testid;
	int msg_base = 0;
	std::string errstr;
	const char *topic_str = test_mk_topic_name("0013", 1);
/*	if(!topic){
		TEST_FAIL()
	}*/

	//const RdKafka::Conf::ConfResult confResult = conf->set("debug","all",errstr);
	//if(confResult != RdKafka::Conf::CONF_OK){
	//	std::stringstream errstring;
	//	errstring << "Can't set config" << errstr;
	//	TEST_FAIL(errstring.str().c_str());
	//}

	TEST_SAY("Topic %s.\n", topic_str);
	
	const RdKafka::Conf::ConfResult confBrokerResult = conf->set("metadata.broker.list", "localhost:9092", errstr);
	if(confBrokerResult != RdKafka::Conf::CONF_OK){
		std::stringstream errstring;
		errstring << "Can't set broker" << errstr;
		TEST_FAIL(errstring.str().c_str());
	}

	/* Create a producer to fetch metadata */
	RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
	if (!producer) {
		std::stringstream errstring;
		errstring << "Can't create producer" << errstr;
		TEST_FAIL(errstring.str().c_str());
	}

	/*
	* Create topic handle.
	*/
	RdKafka::Topic *topic = NULL;
	topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
	if (!topic) {
		std::stringstream errstring;
		errstring << "Can't create topic" << errstr;
		exit(1);
	}

	/* First request of metadata: It have to fail */
	err = producer->metadata(topic!=NULL, topic,
	                      &metadata, 5000);
	if (err != RdKafka::ERR_NO_ERROR) {
		std::stringstream errstring;
		errstring << "Can't request first metadata: " << errstr;
		TEST_FAIL(errstring.str().c_str());
	}

	/* It's a new topic, it should have no partitions */
	if(metadata->topics()->at(0)->partitions()->size() != 0){
		TEST_FAIL("ISRS != 0");
	}

	sleep(1);

	/* Second request of metadata: It have to success */
	err = producer->metadata(topic!=NULL, topic,
	                      &metadata, 5000);
	
	/* It should have now partitions */
	if(metadata->topics()->at(0)->partitions()->size() == 0){
		TEST_FAIL("ISRS == 0");
	}


	delete topic;
	delete producer;
	delete tconf;
	delete conf;

	/* Wait for everything to be cleaned up since broker destroys are
	 * handled in its own thread. */
	test_wait_exit(10);

	/* If we havent failed at this point then
	 * there were no threads leaked */
	return;
}

int main (int argc, char **argv) {
	test_conf_init (NULL, NULL, 20);
	test_metadata_cpp();
	return 0;
}