Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2015, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"
#include "rdkafka.h"


#define _MSG_COUNT 10
struct latconf {
        const char *name;
        const char *conf[16];
        int min;    /* Minimum expected latency */
        int max;    /* Maximum expected latency */

        float rtt;  /* Network+broker latency */


        /* Result vector */
        float latency[_MSG_COUNT];
        float sum;
        int   cnt;
};


static void dr_msg_cb (rd_kafka_t *rk,
                       const rd_kafka_message_t *rkmessage, void *opaque) {
        struct latconf *latconf = opaque;
        int64_t *ts_send = (int64_t *)rkmessage->_private;
        float delivery_time;

        if (rkmessage->err)
                TEST_FAIL("%s: delivery failed: %s\n",
                          latconf->name, rd_kafka_err2str(rkmessage->err));

        if (!rkmessage->_private)
                return; /* Priming message, ignore. */

        delivery_time = (float)(test_clock() - *ts_send) / 1000.0f;

        free(ts_send);

        TEST_ASSERT(latconf->cnt < _MSG_COUNT, "");

        TEST_SAY("%s: Message %d delivered in %.3fms\n",
                 latconf->name, latconf->cnt, delivery_time);

        latconf->latency[latconf->cnt++] = delivery_time;
        latconf->sum += delivery_time;
}


static int verify_latency (struct latconf *latconf) {
        float avg;
        int fails = 0;
        double ext_overhead = latconf->rtt +
                5.0 /* broker ProduceRequest handling time, maybe */;

        ext_overhead *= test_timeout_multiplier;

        avg = latconf->sum / (float)latconf->cnt;

        TEST_SAY("%s: average latency %.3fms, allowed range %d..%d +%.0fms\n",
                 latconf->name, avg, latconf->min, latconf->max, ext_overhead);

        if (avg < (float)latconf->min ||
            avg > (float)latconf->max + ext_overhead) {
                TEST_FAIL_LATER("%s: average latency %.3fms is "
                                "outside range %d..%d +%.0fms",
                                latconf->name, avg, latconf->min, latconf->max,
                                ext_overhead);
                fails++;
        }

        return fails;
}

static void measure_rtt (struct latconf *latconf, rd_kafka_t *rk) {
        rd_kafka_resp_err_t err;
        const struct rd_kafka_metadata *md;
        int64_t ts = test_clock();

        err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000));
        TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
        latconf->rtt = (float)(test_clock() - ts) / 1000.0f;

        TEST_SAY("%s: broker base RTT is %.3fms\n",
                 latconf->name, latconf->rtt);
        rd_kafka_metadata_destroy(md);
}

static int test_producer_latency (const char *topic,
                                  struct latconf *latconf) {
        rd_kafka_t *rk;
        rd_kafka_conf_t *conf;
        rd_kafka_topic_conf_t *topic_conf;
        rd_kafka_resp_err_t err;
        int i;

        test_conf_init(&conf, &topic_conf, 60);

        rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
        rd_kafka_conf_set_opaque(conf, latconf);

        TEST_SAY("%s: begin\n", latconf->name);
        for (i = 0 ; latconf->conf[i] ; i += 2) {
                TEST_SAY("%s:  set conf %s = %s\n",
                         latconf->name, latconf->conf[i], latconf->conf[i+1]);
                test_any_conf_set(conf, topic_conf,
                                  latconf->conf[i], latconf->conf[i+1]);
        }
        rd_kafka_conf_set_default_topic_conf(conf, topic_conf);

        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        TEST_SAY("%s: priming producer\n", latconf->name);
        /* Send a priming message to make sure everything is up
         * and functional before starting measurements */
        err = rd_kafka_producev(rk,
                                RD_KAFKA_V_TOPIC(topic),
                                RD_KAFKA_V_PARTITION(0),
                                RD_KAFKA_V_VALUE("priming", 7),
                                RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                                RD_KAFKA_V_END);
        if (err)
                TEST_FAIL("%s: priming producev failed: %s",
                          latconf->name, rd_kafka_err2str(err));

        /* Await delivery */
        rd_kafka_flush(rk, tmout_multip(5000));

        /* Get a network+broker round-trip-time base time. */
        measure_rtt(latconf, rk);

        TEST_SAY("%s: producing %d messages\n", latconf->name, _MSG_COUNT);
        for (i = 0 ; i < _MSG_COUNT ; i++) {
                int64_t *ts_send;

                ts_send = malloc(sizeof(*ts_send));
                *ts_send = test_clock();

                err = rd_kafka_producev(rk,
                                        RD_KAFKA_V_TOPIC(topic),
                                        RD_KAFKA_V_PARTITION(0),
                                        RD_KAFKA_V_VALUE("hi", 2),
                                        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                                        RD_KAFKA_V_OPAQUE(ts_send),
                                        RD_KAFKA_V_END);
                if (err)
                        TEST_FAIL("%s: producev #%d failed: %s",
                                  latconf->name, i, rd_kafka_err2str(err));

                /* Await delivery */
                rd_kafka_flush(rk, 5000);
        }

        rd_kafka_destroy(rk);

        return verify_latency(latconf);
}



int main_0055_producer_latency (int argc, char **argv) {
        struct latconf latconfs[] = {
                { "standard settings", {NULL}, 0, 0 }, /* default is now 0ms */
                { "low queue.buffering.max.ms",
                  {"queue.buffering.max.ms", "0", NULL}, 0, 0 },
                { "high queue.buffering.max.ms",
                  {"queue.buffering.max.ms", "3000", NULL}, 3000, 3100},
                { "queue.buffering.max.ms < socket.blocking.max.ms",
                  {"queue.buffering.max.ms", "500",
                   "socket.blocking.max.ms", "3000", NULL}, 500, 600 },
                { "no acks",
                  {"queue.buffering.max.ms", "0",
                   "acks", "0", NULL}, 0, 0 },
                { NULL }
        };
        struct latconf *latconf;
        const char *topic = test_mk_topic_name("0055_producer_latency", 0);
        int fails = 0;

        /* Create topic */
        test_produce_msgs_easy(topic, 0, 0, 1);

        for (latconf = latconfs ; latconf->name ; latconf++)
                fails += test_producer_latency(topic, latconf);

        if (fails)
                TEST_FAIL("See %d previous failure(s)", fails);

        return 0;
}