Blob Blame History Raw
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
*    this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
*    this list of conditions and the following disclaimer in the documentation
*    and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _TEST_H_
#define _TEST_H_

#include "../src/rd.h"

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#ifndef _MSC_VER
#include <unistd.h>
#endif
#include <errno.h>
#include <assert.h>
#include <time.h>

#include "rdkafka.h"
#include "tinycthread.h"
#include "rdlist.h"

#if WITH_SOCKEM
#include "sockem.h"
#endif

#include "testshared.h"
#ifdef _MSC_VER
#define sscanf(...) sscanf_s(__VA_ARGS__)
#endif

/**
 * Test output is controlled through "TEST_LEVEL=N" environemnt variable.
 * N < 2: TEST_SAY() is quiet.
 */
extern int test_level;

extern int test_seed;
extern char test_mode[64];
extern RD_TLS struct test *test_curr;
extern int test_assert_on_fail;
extern int tests_running_cnt;
extern double test_timeout_multiplier;
extern int  test_session_timeout_ms; /* Group session timeout */
extern int  test_flags;
extern int  test_neg_flags;

extern mtx_t test_mtx;

#define TEST_LOCK()   mtx_lock(&test_mtx)
#define TEST_UNLOCK() mtx_unlock(&test_mtx)


#define _C_CLR "\033[0m"
#define _C_RED "\033[31m"
#define _C_GRN "\033[32m"
#define _C_YEL "\033[33m"
#define _C_BLU "\033[34m"
#define _C_MAG "\033[35m"
#define _C_CYA "\033[36m"

typedef enum {
        TEST_NOT_STARTED,
        TEST_SKIPPED,
        TEST_RUNNING,
        TEST_PASSED,
        TEST_FAILED,
} test_state_t;

struct test {
        /**
         * Setup
         */
        const char *name;    /**< e.g. Same as filename minus extension */
        int (*mainfunc) (int argc, char **argv); /**< test's main func */
        const int flags;     /**< Test flags */
#define TEST_F_LOCAL   0x1   /**< Test is local, no broker requirement */
#define TEST_F_KNOWN_ISSUE 0x2 /**< Known issue, can fail without affecting
				*   total test run status. */
#define TEST_F_MANUAL      0x4 /**< Manual test, only started when specifically
                                *   stated */
	int minver;          /**< Limit tests to broker version range. */
	int maxver;

	const char *extra;   /**< Extra information to print in test_summary. */

	char **report_arr;   /**< Test-specific reporting, JSON array of objects. */
	int report_cnt;
	int report_size;

        rd_kafka_resp_err_t exp_dr_err; /* Expected error in test_dr_cb */
        int produce_sync;    /**< test_produce_sync() call in action */
        rd_kafka_resp_err_t produce_sync_err;  /**< DR error */

        /**
         * Runtime
         */
        thrd_t  thrd;
        int64_t start;
        int64_t duration;
        FILE   *stats_fp;
	int64_t timeout;
        test_state_t state;

#if WITH_SOCKEM
        rd_list_t sockets;
        int (*connect_cb) (struct test *test, sockem_t *skm, const char *id);
#endif
        int (*is_fatal_cb) (rd_kafka_t *rk, rd_kafka_resp_err_t err,
                            const char *reason);
};


#ifdef _MSC_VER
#define TEST_F_KNOWN_ISSUE_WIN32  TEST_F_KNOWN_ISSUE
#else
#define TEST_F_KNOWN_ISSUE_WIN32 0
#endif

#ifdef __APPLE__
#define TEST_F_KNOWN_ISSUE_OSX  TEST_F_KNOWN_ISSUE
#else
#define TEST_F_KNOWN_ISSUE_OSX  0
#endif


#define TEST_FAIL0(file,line,do_lock,fail_now,...) do {                 \
                int is_thrd = 0;                                        \
		TEST_SAYL(0, "TEST FAILURE\n");				\
		fprintf(stderr, "\033[31m### Test \"%s\" failed at %s:%i:%s(): ###\n", \
			test_curr->name,                                \
                        file, line,__FUNCTION__);                \
		fprintf(stderr, __VA_ARGS__);				\
		fprintf(stderr, "\n");					\
                fprintf(stderr, "### Test random seed was %i ###\033[0m\n",    \
                        test_seed);                                     \
                if (do_lock)                                            \
                        TEST_LOCK();                                    \
                test_curr->state = TEST_FAILED;                         \
                if (fail_now && test_curr->mainfunc) {                  \
                        tests_running_cnt--;                            \
                        is_thrd = 1;                                    \
                }                                                       \
                if (do_lock)                                            \
                        TEST_UNLOCK();                                  \
		if (!fail_now) break;					\
                if (test_assert_on_fail || !is_thrd)                    \
                        assert(0);                                      \
                else                                                    \
                        thrd_exit(0);                                   \
	} while (0)

/* Whine and abort test */
#define TEST_FAIL(...) TEST_FAIL0(__FILE__,__LINE__,1,1,__VA_ARGS__)

/* Whine right away, mark the test as failed, but continue the test. */
#define TEST_FAIL_LATER(...) TEST_FAIL0(__FILE__,__LINE__,1,0,__VA_ARGS__)

#define TEST_LATER_CHECK(...) do {                              \
        if (test_curr->state == TEST_FAILED)                    \
                TEST_FAIL("See previous errors. " __VA_ARGS__); \
        } while (0)

#define TEST_PERROR(call) do {						\
		if (!(call))						\
			TEST_FAIL(#call " failed: %s", rd_strerror(errno)); \
	} while (0)

#define TEST_WARN(...) do {                                              \
                fprintf(stderr, "\033[33m[%-28s/%7.3fs] WARN: ",	\
			test_curr->name,                                \
			test_curr->start ?                              \
			((float)(test_clock() -                         \
                                 test_curr->start)/1000000.0f) : 0);    \
		fprintf(stderr, __VA_ARGS__);				\
                fprintf(stderr, "\033[0m");                             \
	} while (0)

#define TEST_SAY0(...)  fprintf(stderr, __VA_ARGS__)
#define TEST_SAYL(LVL,...) do {						\
	if (test_level >= LVL) {                                        \
                fprintf(stderr, "\033[36m[%-28s/%7.3fs] ",		\
			test_curr->name,                                \
			test_curr->start ?                              \
			((float)(test_clock() -                         \
                                 test_curr->start)/1000000.0f) : 0);    \
		fprintf(stderr, __VA_ARGS__);				\
                fprintf(stderr, "\033[0m");                             \
        }                                                               \
	} while (0)
#define TEST_SAY(...) TEST_SAYL(2, __VA_ARGS__)

/**
 * Append JSON object (as string) to this tests' report array.
 */
#define TEST_REPORT(...) test_report_add(test_curr, __VA_ARGS__)


/* "..." is a failure reason in printf format, include as much info as needed */
#define TEST_ASSERT(expr,...) do {            \
        if (!(expr)) {                        \
                      TEST_FAIL("Test assertion failed: \"" # expr  "\": " \
                                __VA_ARGS__);                           \
                      }                                                 \
        } while (0)

/* Skip the current test. Argument is textual reason (printf format) */
#define TEST_SKIP(...) do {		     \
		TEST_WARN("SKIPPING TEST: " __VA_ARGS__); \
		TEST_LOCK();			     \
		test_curr->state = TEST_SKIPPED;     \
		TEST_UNLOCK();			     \
	} while (0)


void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
		     int timeout);


void test_wait_exit (int timeout);

uint64_t test_id_generate (void);
char *test_str_id_generate (char *dest, size_t dest_size);
const char *test_str_id_generate_tmp (void);





void test_msg_fmt (char *dest, size_t dest_size,
		   uint64_t testid, int32_t partition, int msgid);
void test_msg_parse0 (const char *func, int line,
		      uint64_t testid, rd_kafka_message_t *rkmessage,
		      int32_t exp_partition, int *msgidp);
#define test_msg_parse(testid,rkmessage,exp_partition,msgidp)	\
	test_msg_parse0(__FUNCTION__,__LINE__,\
			testid,rkmessage,exp_partition,msgidp)


static RD_INLINE int jitter (int low, int high) RD_UNUSED;
static RD_INLINE int jitter (int low, int high) {
	return (low + (rand() % ((high-low)+1)));
}



/******************************************************************************
 *
 * Helpers
 *
 ******************************************************************************/



/****************************************************************
 * Message verification services				*
 *								*
 *								*
 *								*
 ****************************************************************/


/**
 * A test_msgver_t is first fed with messages from any number of
 * topics and partitions, it is then checked for expected messages, such as:
 *   - all messages received, based on message payload information.
 *   - messages received in order
 *   - EOF
 */
typedef struct test_msgver_s {
	struct test_mv_p **p;  /* Partitions array */
	int p_cnt;             /* Partition count */
	int p_size;            /* p size */
	int msgcnt;            /* Total message count */
	uint64_t testid;       /* Only accept messages for this testid */

	struct test_msgver_s *fwd;  /* Also forward add_msg() to this mv */

	int log_cnt;           /* Current number of warning logs */
	int log_max;           /* Max warning logs before suppressing. */
	int log_suppr_cnt;     /* Number of suppressed log messages. */

        const char *msgid_hdr; /**< msgid string is in header by this name,
                                * rather than in the payload (default). */
} test_msgver_t;

/* Message */
struct test_mv_m {
        int64_t offset;    /* Message offset */
        int     msgid;     /* Message id */
        int64_t timestamp; /* Message timestamp */
};


/* Message vector */
struct test_mv_mvec {
	struct test_mv_m *m;
	int cnt;
	int size;  /* m[] size */
};

/* Partition */
struct test_mv_p {
	char *topic;
	int32_t partition;
	struct test_mv_mvec mvec;
	int64_t eof_offset;
};

/* Verification state */
struct test_mv_vs {
	int msg_base;
	int exp_cnt;

	/* used by verify_range */
	int msgid_min;
	int msgid_max;
        int64_t timestamp_min;
        int64_t timestamp_max;

	struct test_mv_mvec mvec;

        /* Correct msgver for comparison */
        test_msgver_t *corr;
} vs;


void test_msgver_init (test_msgver_t *mv, uint64_t testid);
void test_msgver_clear (test_msgver_t *mv);
int test_msgver_add_msg00 (const char *func, int line, test_msgver_t *mv,
                           uint64_t testid,
                           const char *topic, int32_t partition,
                           int64_t offset, int64_t timestamp,
                           rd_kafka_resp_err_t err, int msgnum);
int test_msgver_add_msg0 (const char *func, int line,
			  test_msgver_t *mv, rd_kafka_message_t *rkm);
#define test_msgver_add_msg(mv,rkm) \
	test_msgver_add_msg0(__FUNCTION__,__LINE__,mv,rkm)

/**
 * Flags to indicate what to verify.
 */
#define TEST_MSGVER_ORDER    0x1  /* Order */
#define TEST_MSGVER_DUP      0x2  /* Duplicates */
#define TEST_MSGVER_RANGE    0x4  /* Range of messages */

#define TEST_MSGVER_ALL      0xf  /* All verifiers */

#define TEST_MSGVER_BY_MSGID  0x10000 /* Verify by msgid (unique in testid) */
#define TEST_MSGVER_BY_OFFSET 0x20000 /* Verify by offset (unique in partition)*/
#define TEST_MSGVER_BY_TIMESTAMP 0x40000 /* Verify by timestamp range */

/* Only test per partition, not across all messages received on all partitions.
 * This is useful when doing incremental verifications with multiple partitions
 * and the total number of messages has not been received yet.
 * Can't do range check here since messages may be spread out on multiple
 * partitions and we might just have read a few partitions. */
#define TEST_MSGVER_PER_PART ((TEST_MSGVER_ALL & ~TEST_MSGVER_RANGE) | \
			      TEST_MSGVER_BY_MSGID | TEST_MSGVER_BY_OFFSET)

/* Test on all messages across all partitions.
 * This can only be used to check with msgid, not offset since that
 * is partition local. */
#define TEST_MSGVER_ALL_PART (TEST_MSGVER_ALL | TEST_MSGVER_BY_MSGID)


int test_msgver_verify_part0 (const char *func, int line, const char *what,
			      test_msgver_t *mv, int flags,
			      const char *topic, int partition,
			      int msg_base, int exp_cnt);
#define test_msgver_verify_part(what,mv,flags,topic,partition,msg_base,exp_cnt) \
	test_msgver_verify_part0(__FUNCTION__,__LINE__,			\
				 what,mv,flags,topic,partition,msg_base,exp_cnt)

int test_msgver_verify0 (const char *func, int line, const char *what,
			 test_msgver_t *mv, int flags, struct test_mv_vs vs);
#define test_msgver_verify(what,mv,flags,msgbase,expcnt)		\
	test_msgver_verify0(__FUNCTION__,__LINE__,			\
			    what,mv,flags,                              \
                            (struct test_mv_vs){.msg_base = msgbase,   \
                                            .exp_cnt = expcnt})


void test_msgver_verify_compare0 (const char *func, int line,
                                  const char *what, test_msgver_t *mv,
                                  test_msgver_t *corr, int flags);
#define test_msgver_verify_compare(what,mv,corr,flags) \
        test_msgver_verify_compare0(__FUNCTION__,__LINE__, what, mv, corr, flags)

rd_kafka_t *test_create_handle (int mode, rd_kafka_conf_t *conf);

/**
 * Delivery reported callback.
 * Called for each message once to signal its delivery status.
 */
void test_dr_cb (rd_kafka_t *rk, void *payload, size_t len,
                 rd_kafka_resp_err_t err, void *opaque, void *msg_opaque);

rd_kafka_t *test_create_producer (void);
rd_kafka_topic_t *test_create_producer_topic(rd_kafka_t *rk,
	const char *topic, ...);
void test_wait_delivery (rd_kafka_t *rk, int *msgcounterp);
void test_produce_msgs_nowait (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
                               uint64_t testid, int32_t partition,
                               int msg_base, int cnt,
                               const char *payload, size_t size,
                               int *msgcounterp);
void test_produce_msgs (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
                        uint64_t testid, int32_t partition,
                        int msg_base, int cnt,
			const char *payload, size_t size);
rd_kafka_resp_err_t test_produce_sync (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
                                       uint64_t testid, int32_t partition);

rd_kafka_t *test_create_consumer (const char *group_id,
				  void (*rebalance_cb) (
					  rd_kafka_t *rk,
					  rd_kafka_resp_err_t err,
					  rd_kafka_topic_partition_list_t
					  *partitions,
					  void *opaque),
				  rd_kafka_conf_t *conf,
                                  rd_kafka_topic_conf_t *default_topic_conf);
rd_kafka_topic_t *test_create_consumer_topic (rd_kafka_t *rk,
                                              const char *topic);
rd_kafka_topic_t *test_create_topic_object (rd_kafka_t *rk,
					    const char *topic, ...);
void test_consumer_start (const char *what,
                          rd_kafka_topic_t *rkt, int32_t partition,
                          int64_t start_offset);
void test_consumer_stop (const char *what,
                         rd_kafka_topic_t *rkt, int32_t partition);
void test_consumer_seek (const char *what, rd_kafka_topic_t *rkt,
                         int32_t partition, int64_t offset);

#define TEST_NO_SEEK  -1
int64_t test_consume_msgs (const char *what, rd_kafka_topic_t *rkt,
                           uint64_t testid, int32_t partition, int64_t offset,
                           int exp_msg_base, int exp_cnt, int parse_fmt);


void test_verify_rkmessage0 (const char *func, int line,
                             rd_kafka_message_t *rkmessage, uint64_t testid,
                             int32_t partition, int msgnum);
#define test_verify_rkmessage(rkmessage,testid,partition,msgnum) \
        test_verify_rkmessage0(__FUNCTION__,__LINE__,\
                               rkmessage,testid,partition,msgnum)

void test_consumer_subscribe (rd_kafka_t *rk, const char *topic);

void
test_consume_msgs_easy_mv (const char *group_id, const char *topic,
                           uint64_t testid, int exp_eofcnt, int exp_msgcnt,
                           rd_kafka_topic_conf_t *tconf,
                           test_msgver_t *mv);
void
test_consume_msgs_easy (const char *group_id, const char *topic,
                        uint64_t testid, int exp_eofcnt, int exp_msgcnt,
                        rd_kafka_topic_conf_t *tconf);

void test_consumer_poll_no_msgs (const char *what, rd_kafka_t *rk,
				 uint64_t testid, int timeout_ms);
int test_consumer_poll_once (rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms);
int test_consumer_poll (const char *what, rd_kafka_t *rk, uint64_t testid,
                        int exp_eof_cnt, int exp_msg_base, int exp_cnt,
			test_msgver_t *mv);

void test_consumer_assign (const char *what, rd_kafka_t *rk,
			   rd_kafka_topic_partition_list_t *parts);
void test_consumer_unassign (const char *what, rd_kafka_t *rk);

void test_consumer_close (rd_kafka_t *rk);

void test_flush (rd_kafka_t *rk, int timeout_ms);

void test_conf_set (rd_kafka_conf_t *conf, const char *name, const char *val);
char *test_conf_get (const rd_kafka_conf_t *conf, const char *name);
int test_conf_match (rd_kafka_conf_t *conf, const char *name, const char *val);
void test_topic_conf_set (rd_kafka_topic_conf_t *tconf,
                          const char *name, const char *val);
void test_any_conf_set (rd_kafka_conf_t *conf,
                        rd_kafka_topic_conf_t *tconf,
                        const char *name, const char *val);

void test_print_partition_list (const rd_kafka_topic_partition_list_t
				*partitions);

void test_kafka_topics (const char *fmt, ...);
void test_create_topic (const char *topicname, int partition_cnt,
			int replication_factor);
rd_kafka_resp_err_t test_auto_create_topic_rkt (rd_kafka_t *rk,
                                                rd_kafka_topic_t *rkt);
rd_kafka_resp_err_t test_auto_create_topic (rd_kafka_t *rk, const char *name);
int test_check_auto_create_topic (void);

int test_get_partition_count (rd_kafka_t *rk, const char *topicname);

int test_check_builtin (const char *feature);

char *tsprintf (const char *fmt, ...) RD_FORMAT(printf, 1, 2);

void test_report_add (struct test *test, const char *fmt, ...);
int test_can_create_topics (int skip);

rd_kafka_event_t *test_wait_event (rd_kafka_queue_t *eventq,
				   rd_kafka_event_type_t event_type,
				   int timeout_ms);

void test_prepare_msg (uint64_t testid, int32_t partition, int msg_id,
                       char *val, size_t val_size,
                       char *key, size_t key_size);

#if WITH_SOCKEM
void test_socket_enable (rd_kafka_conf_t *conf);
void test_socket_close_all (struct test *test, int reinit);
int  test_socket_sockem_set_all (const char *key, int val);
#endif

void test_headers_dump (const char *what, int lvl,
                        const rd_kafka_headers_t *hdrs);

#endif /* _TEST_H_ */