
If you are using the default state store in Kafka Streams (RocksDB), you can use ldb and sst_dump (two admin tools that can (optionally) be built when installing RocksDB), to inspect and interact with your state stores.
Note: be careful when writing to state stores outside of Kafka Streams. I recommend performing read-only operations unless you have a very good reason to manipulate this data outside of your Kafka Streams app.
# install gflags (Google Commandline Flags)
$ brew install gflags
# clone rocksdb
$ git clone git@github.com:facebook/rocksdb.git
# build the binaries
$ cd rocksdb
$ make ldb sst_dump
That will drop a binary called ldb
, and another called sst_dump
, in the current directory. Copy these to some location on your path, e.g.
$ cp ldb /usr/local/bin/
$ cp sst_dump /usr/local/bin/
Before running any of the examples below, set the following environment variables.
# change these values accordingly
APP_ID=my-app
STATE_STORE=my-counts
STATE_STORE_DIR=/tmp/kafka-streams
TASKS=$(ls $STATE_STORE_DIR/$APP_ID)
#!/bin/bash
# view all keys
for i in $TASKS; do
ldb --db=$STATE_STORE_DIR/$APP_ID/$i/rocksdb/$STATE_STORE scan 2>/dev/null;
done
#!/bin/bash
# count keys
for i in $TASKS; do
ldb --db=$STATE_STORE_DIR/$APP_ID/$i/rocksdb/$STATE_STORE scan 2>/dev/null;
done \
| wc -l | tr -d ' '
#!/bin/bash
for i in $TASKS; do
TABLE_PROPERTIES=$(sst_dump --file=$STATE_STORE_DIR/$APP_ID/$i/rocksdb/$STATE_STORE --show_properties)
echo -e "Table properties for task: $i\n$TABLE_PROPERTIES\n\n"
done
# example output
Table properties for task: 1_9
from [] to []
Process /tmp/kafka-streams/my-app/1_9/rocksdb/my-counts/000006.sst
Sst file format: block-based
Table Properties:
------------------------------
# data blocks: 1
# entries: 2
raw key size: 18
raw average key size: 9.000000
raw value size: 88
raw average value size: 44.000000
data block size: 125
index block size: 35
filter block size: 0
(estimated) table size: 160
filter policy name: N/A
column family ID: 0
column family name: default
comparator name: rocksdb.InternalKeyComparator:leveldb.BytewiseComparator
merge operator name: nullptr
property collectors names: []
SST file compression algo: NoCompression
# deleted keys: 0
# merge operands: 0
Raw user collected properties
------------------------------
# rocksdb.block.based.table.index.type: 0x00000000
# rocksdb.block.based.table.prefix.filtering: 0x30
# rocksdb.block.based.table.whole.key.filtering: 0x31
# rocksdb.deleted.keys: 0x00
# rocksdb.merge.operands: 0x00
I’ll make updates to this article as I come across more use cases, but feel free to explore the ldb and sst docs for more information on how to use these tools :)