
One of the most common use-cases in a streaming application is being able to join two different input sources into a new stream. Kafka Streams enables this behavior through a variety of join operations, including join
, leftJoin
, and outerJoin
. Since joins are a stateful operation in Kafka Streams, a state store is utilized every time joins are used. However, there is a known issue (see KAFKA-4113) that, under certain conditions, will prevent the state store from being fully materialized. When this occurs, the following unintended side-effects may be observed:
leftJoin
may be incorrectly marked as nullThis article will describe the conditions under which this behavior manifests itself, and proposes some possible solutions for working around this behavior.
Before diving into the possible solutions, let first reproduce the issue so that we can understand the circumstances that lead to the state store not being fully materialized.
First, set up some brokers using the wurstmeister/kafka-docker image.
$ docker-machine create --driver virtualbox default
$ eval $(docker-machine env)
$ git clone git@github.com:wurstmeister/kafka-docker.git && cd kafka-docker
Place the following configuration in a file named docker-compose-kafka-all.yml
.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Now, start the containers.
$ docker-compose -f docker-compose-kafka-all.yml up -d
If everything goes well, you should be able to telnet to the broker and zookeeper node.
$ telnet $(docker-machine ip default) 9092
$ telnet $(docker-machine ip default) 2181
public void forceKTableBootstrap(String sourceTopic, Deserializer keyDeserializer, Deserializer valueDeserializer) {
try {
Map<String, Object> config = getStreamsConfig();
String groupId = config.get(APPLICATION_ID_CONFIG).toString();
config.put(GROUP_ID_CONFIG, groupId);
config.put(ENABLE_AUTO_COMMIT_CONFIG, true);
config.put(MAX_POLL_RECORDS_CONFIG, 1);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
List<TopicPartition> unInitialized = new ArrayList<>();
for (TopicPartition tp : partitions) {
OffsetAndMetadata offsetAndMetaData = consumer.committed(tp);
if (offsetAndMetaData == null) {
unInitialized.add(tp);
}
}
if (unInitialized.size() > 0) {
log.info("Bootstrapping {} state stores for topic: {}", unInitialized.size(), sourceTopic);
consumer.seekToBeginning(unInitialized);
} else {
log.info("{} state stores have already been bootstrapped for topic: {}", partitions.size(), sourceTopic);
}
}
};
consumer.subscribe(Collections.singletonList(sourceTopic), listener);
consumer.poll(1000L);
consumer.close();
} catch (InconsistentGroupProtocolException e) {
log.info("Inconsistent group protocol while bootstrapping KTable. Restarting");
restart();
}
}