Posts /

Accessing the RocksDB state store in Kafka Streams 0.10.0

02 Jan 2017

Overview

The RocksDB state store that Kafka Streams uses to persist local state is a little hard to get to in version 0.10.0 when using the Kafka Streams DSL. While this issue was addressed and fixed in version 0.10.1, the wire changes also released in Kafka Streams 0.10.1 require users to update both their clients and their brokers, so some people may be stuck with 0.10.0 for the time being. This article will show users who are using the older Kafka Streams client libs how to access the underlying state store.

Motivation

So, why would a user even need to access the Kafka Streams state store directly? One of the primary reasons for doing so is for metrics collection. For example, as a developer, you may want to know how many entries are in your state store at a given point in time (Related: KAFKA-3753). This will allow you to monitor the growth of the state store over time, or even debug KTable bootstrapping issues like the one detailed here. This article will focus on the metrics collection use-case, but once you learn how to access the state store directly, you can adapt the code to different use-cases very easily.

Finding the state store

In order to access the state store, we need to know four things:

Once we have the above information, we can find each individual state store at the following location.

$STATE_DIR_CONFIG/$APPLICATION_ID_CONFIG/$TASK_ID/rocksdb/$STATE_STORE_NAME

For example, if our STATE_DIR_CONFIG is set to the default value, APPLICATION_ID_CONFIG is set to example_app_dev, and Kafka assigned a Task ID of 1_9 to our stream processing application, we could access a state store named mysourcetopic by connecting a RocksDB client (covered later) to the following path:

/tmp/kafka-streams/example_app_dev/1_9/rocksdb/mysourcetopic

Since the state store directory and the application ID are set in the StreamsConfig, I will assume you know how to extract those values. The state store name (e.g. mysourcetopic in the example above) will be set to the topic that you initialize your KTable with. For example:

KStreamBuilder builder = new KStreamBuilder();
builder.table("mysourcetopic");

The more difficult value to get is the Task ID, so we will cover that next.

Getting the Task ID

When you run your Kafka Streams app, a Task ID will be created for each partition and topic group assigned to the running Kafka Streams instance (see here). So, we just need a way for our code to be notified when a new Task ID is assigned to our streams instance. Luckily, the client libs provide a configuration parameter called PARTITION_GROUPER_CLASS_CONFIG, which allows us to specify a class that can be used for intercepting Task IDs as they are created. We could even define how these Task IDs are created, but in the example below, we will just delegate to the default partition grouper, which is probably what you will want to do as well.

public class StreamsApp {

    private static List<String> taskIds = new ArrayList<>();

    // ... more properties, methods, etc
    
    public static class StreamsPartitionGrouper implements PartitionGrouper {
        private PartitionGrouper defaultPartitionGrouper = new DefaultPartitionGrouper();

        public StreamsPartitionGrouper() {}

        @Override
        public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata) {
            Map<TaskId, Set<TopicPartition>> result = defaultPartitionGrouper.partitionGroups(topicGroups, metadata);
            for (Map.Entry<TaskId, Set<TopicPartition>> entry : result.entrySet()) {
                // Yay! now we have the Task ID.
                TaskId taskId = entry.getKey();
                // Save this to a list so we can access it later
                if (!taskIds.contains(taskIds.toString())) {
                    taskIds.add(taskIds.toString());
                }
            }
            return result;
        }
    }
}

In order for this code to actually get called, be sure to specify this class when creating your StreamsConfig.

Properties streamingProps = new Properties();
streamingProps.put(APPLICATION_ID_CONFIG, "my-app");
streamingProps.put(PARTITION_GROUPER_CLASS_CONFIG, StreamsApp.StreamsPartitionGrouper.class.getName());
// etc
StreamsConfig streamsConfig = new StreamsConfig(streamingProps);

Putting it all together

Now that you know how to get the path to each state store directory, we just need to connect using the RocksDB client. To open a read-only instance, use the following code.

RocksDB db = RocksDB.openReadOnly(stateStoreDir);

You can also open a writeable instance with the following:

RocksDB db = RocksDB.open(stateStoreDir);

With our RocksDB instance created, extracting metrics is a breeze. For example, we can get the estimated number of keys in our state-store with the following code:

db.getLongProperty("rocksdb.estimate-num-keys");

Thanks for reading, and if you have any questions, please feel free to reach out.