Posts /

How to bootstrap KTables in Kafka Streams 0.10

Problem

One of the most common use-cases in a streaming application is being able to join two different input sources into a new stream. Kafka Streams enables this behavior through a variety of join operations, including join, leftJoin, and outerJoin. Since joins are a stateful operation in Kafka Streams, a state store is utilized every time joins are used. However, there is a known issue (see KAFKA-4113) that, under certain conditions, will prevent the state store from being fully materialized. When this occurs, the following unintended side-effects may be observed:

This article will describe the conditions under which this behavior manifests itself, and proposes some possible solutions for working around this behavior.

Reproducing

Before diving into the possible solutions, let first reproduce the issue so that we can understand the circumstances that lead to the state store not being fully materialized.

Broker setup

First, set up some brokers using the wurstmeister/kafka-docker image.

Create a docker machine

$ docker-machine create --driver virtualbox default
$ eval $(docker-machine env)

Start the Kafka and Zookeeper containers

$ git clone git@github.com:wurstmeister/kafka-docker.git && cd kafka-docker

Place the following configuration in a file named docker-compose-kafka-all.yml.

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Now, start the containers.

$ docker-compose -f docker-compose-kafka-all.yml up -d

If everything goes well, you should be able to telnet to the broker and zookeeper node.

$ telnet $(docker-machine ip default) 9092
$ telnet $(docker-machine ip default) 2181

Solution

public void forceKTableBootstrap(String sourceTopic, Deserializer keyDeserializer, Deserializer valueDeserializer) {
        try {
            Map<String, Object> config = getStreamsConfig();
            String groupId = config.get(APPLICATION_ID_CONFIG).toString();
            config.put(GROUP_ID_CONFIG, groupId);
            config.put(ENABLE_AUTO_COMMIT_CONFIG, true);
            config.put(MAX_POLL_RECORDS_CONFIG, 1);
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
            ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    List<TopicPartition> unInitialized = new ArrayList<>();
                    for (TopicPartition tp : partitions) {
                        OffsetAndMetadata offsetAndMetaData = consumer.committed(tp);
                        if (offsetAndMetaData == null) {
                            unInitialized.add(tp);
                        }
                    }
                    if (unInitialized.size() > 0) {
                        log.info("Bootstrapping {} state stores for topic: {}", unInitialized.size(), sourceTopic);
                        consumer.seekToBeginning(unInitialized);
                    } else {
                        log.info("{} state stores have already been bootstrapped for topic: {}", partitions.size(), sourceTopic);
                    }
                }
            };
            consumer.subscribe(Collections.singletonList(sourceTopic), listener);
            consumer.poll(1000L);
            consumer.close();
        } catch (InconsistentGroupProtocolException e) {
            log.info("Inconsistent group protocol while bootstrapping KTable. Restarting");
            restart();
        }
    }