Posts /

Using ldb and sst_dump to inspect Kafka Streams state stores

16 Mar 2017

Overview

If you are using the default state store in Kafka Streams (RocksDB), you can use ldb and sst_dump (two admin tools that can (optionally) be built when installing RocksDB), to inspect and interact with your state stores.

Note: be careful when writing to state stores outside of Kafka Streams. I recommend performing read-only operations unless you have a very good reason to manipulate this data outside of your Kafka Streams app.

Installing

# install gflags (Google Commandline Flags)
$ brew install gflags

# clone rocksdb
$ git clone git@github.com:facebook/rocksdb.git

# build the binaries
$ cd rocksdb
$ make ldb sst_dump

That will drop a binary called ldb, and another called sst_dump, in the current directory. Copy these to some location on your path, e.g.

$ cp ldb /usr/local/bin/
$ cp sst_dump /usr/local/bin/

Examples

Before running any of the examples below, set the following environment variables.

# change these values accordingly
APP_ID=my-app
STATE_STORE=my-counts
STATE_STORE_DIR=/tmp/kafka-streams
TASKS=$(ls $STATE_STORE_DIR/$APP_ID)

View all keys

#!/bin/bash

# view all keys
for i in $TASKS; do 
  ldb --db=$STATE_STORE_DIR/$APP_ID/$i/rocksdb/$STATE_STORE scan 2>/dev/null; 
done

Count keys

#!/bin/bash

# count keys
for i in $TASKS; do 
  ldb --db=$STATE_STORE_DIR/$APP_ID/$i/rocksdb/$STATE_STORE scan 2>/dev/null; 
done \
| wc -l | tr -d ' '

Show table properties

#!/bin/bash
for i in $TASKS; do
    TABLE_PROPERTIES=$(sst_dump --file=$STATE_STORE_DIR/$APP_ID/$i/rocksdb/$STATE_STORE --show_properties)
    echo -e "Table properties for task: $i\n$TABLE_PROPERTIES\n\n"
done

# example output
Table properties for task: 1_9
from [] to []
Process /tmp/kafka-streams/my-app/1_9/rocksdb/my-counts/000006.sst
Sst file format: block-based
Table Properties:
------------------------------
  # data blocks: 1
  # entries: 2
  raw key size: 18
  raw average key size: 9.000000
  raw value size: 88
  raw average value size: 44.000000
  data block size: 125
  index block size: 35
  filter block size: 0
  (estimated) table size: 160
  filter policy name: N/A
  column family ID: 0
  column family name: default
  comparator name: rocksdb.InternalKeyComparator:leveldb.BytewiseComparator
  merge operator name: nullptr
  property collectors names: []
  SST file compression algo: NoCompression
  # deleted keys: 0
  # merge operands: 0
Raw user collected properties
------------------------------
  # rocksdb.block.based.table.index.type: 0x00000000
  # rocksdb.block.based.table.prefix.filtering: 0x30
  # rocksdb.block.based.table.whole.key.filtering: 0x31
  # rocksdb.deleted.keys: 0x00
  # rocksdb.merge.operands: 0x00

What now?

I’ll make updates to this article as I come across more use cases, but feel free to explore the ldb and sst docs for more information on how to use these tools :)