/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2015 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rdkafka_int.h"
#include "rdkafka_assignor.h"
/**
* Source: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
*
* The roundrobin assignor lays out all the available partitions and all the
* available consumers. It then proceeds to do a roundrobin assignment from
* partition to consumer. If the subscriptions of all consumer instances are
* identical, then the partitions will be uniformly distributed. (i.e., the
* partition ownership counts will be within a delta of exactly one across all
* consumers.)
*
* For example, suppose there are two consumers C0 and C1, two topics t0 and
* t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
* t0p2, t1p0, t1p1, and t1p2.
*
* The assignment will be:
* C0: [t0p0, t0p2, t1p1]
* C1: [t0p1, t1p0, t1p2]
*/
rd_kafka_resp_err_t
rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
const char *member_id,
const char *protocol_name,
const rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
size_t member_cnt,
rd_kafka_assignor_topic_t
**eligible_topics,
size_t eligible_topic_cnt,
char *errstr, size_t errstr_size,
void *opaque) {
unsigned int ti;
int next = 0; /* Next member id */
/* Sort topics by name */
qsort(eligible_topics, eligible_topic_cnt, sizeof(*eligible_topics),
rd_kafka_assignor_topic_cmp);
/* Sort members by name */
qsort(members, member_cnt, sizeof(*members),
rd_kafka_group_member_cmp);
for (ti = 0 ; ti < eligible_topic_cnt ; ti++) {
rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti];
int partition;
/* For each topic+partition, assign one member (in a cyclic
* iteration) per partition until the partitions are exhausted*/
for (partition = 0 ;
partition < eligible_topic->metadata->partition_cnt ;
partition++) {
rd_kafka_group_member_t *rkgm;
/* Scan through members until we find one with a
* subscription to this topic. */
while (!rd_kafka_group_member_find_subscription(
rk, &members[next],
eligible_topic->metadata->topic))
next++;
rkgm = &members[next];
rd_kafka_dbg(rk, CGRP, "ASSIGN",
"roundrobin: Member \"%s\": "
"assigned topic %s partition %d",
rkgm->rkgm_member_id->str,
eligible_topic->metadata->topic,
partition);
rd_kafka_topic_partition_list_add(
rkgm->rkgm_assignment,
eligible_topic->metadata->topic, partition);
next = (next+1) % rd_list_cnt(&eligible_topic->members);
}
}
return 0;
}