K  S  Q  L

    Streaming SQL Engine


  • Continuous queries against Kafka topics
  • Familiar, SQL syntax
  • Easy to deploy
  • Highly scalable

A very quick

Kafka Refresher

  • Topics
  • Partitions
  • Consumers
  • Producers

Topics

&

Partitions

Data is stored in topics

Topics are split into partitions

#!

                          Topic
              
              _____________________________________ 
Partition 1   |  |  |  |  |  |  |  |  |  |  |  |
              _____________________________________ 

              _____________________________________ 
Partition 2   |  |  |  |  |  |  |  |  |  |  |  |
              _____________________________________

Producers

&

Consumers

Producer
  |                                                     _______ Consumer 1
  |                                                    /
  |      ________________________________________  __ /
   --->      |  |  |  |  |  |  |  |  |  |  |  |  
         ________________________________________   <---------- Consumer 2
         
                                                    \
                                                     \
                                                      \ _______ Consumer 3

Consumer Groups

Anatomy of a message

// simplified

Timestamp  => int64
Key        => bytes
Value      => bytes

Stream processing in recent years

First generation


  • Use Kafka producer / consumer APIs directly
  • Fine grained control over runtime environment
  • High maintenance costs
  • Usually on dedicated hardware :(

Kafka Streams


  • Java apps
  • Use Kafka Streams library for defining topologies
  • Less control over application loop / runtime
  • Usually a containerized deployment model

Kafka Streams is great, and we'll continue to use it for more complex topologies. But for simpler topologies, we can use...

K S Q L


  • Good for simpler topologies, middleware
  • Abstraction on top of Kafka Streams
  • Simply write your query and deploy to a prod KSQL server

Architecture

Modes

  • Interactive
  • Headless

Interactive

  • REST API is enabled
  • Execute queries from
    ksql>
    CLI
  • Good for experimentation

Headless

  • REST API disabled, so no client access
  • Queries are executed from predefined SQL file
  • Ideal for production deployments

Exploring the CLI

Viewing configs

ksql> show properties ;

 Property                                               | Value
------------------------------------------------------------------------------------------
 ksql.extension.dir                                     | ext
 ksql.output.topic.name.prefix                          |
 ksql.persistent.prefix                                 | query_
 ksql.schema.registry.url                               | http://localhost:8081
 ksql.service.id                                        | dev
 ksql.sink.partitions                                   | 4
 ksql.sink.replicas                                     | 1
 ksql.sink.window.change.log.additional.retention       | 1000000
 ksql.statestore.suffix                                 | _ksql_statestore
 ksql.streams.application.id                            | KSQL_REST_SERVER_DEFAULT_APP_ID
 ksql.streams.auto.offset.reset                         | earliest
 ksql.streams.bootstrap.servers                         | localhost:9092
 ksql.streams.cache.max.bytes.buffering                 | 10000000
 ksql.streams.commit.interval.ms                        | 2000
 ...
------------------------------------------------------------------------------------------

Setting configs

ksql> SET 'ksql.udfs.enabled' = 'false';
ksql> SET 'auto.offset.reset' = 'latest';

Viewing topics

ksql> show topics ;

 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------
 api-logs    | false      | 4          | 1                  | 0         | 0
 locations   | false      | 4          | 1                  | 0         | 0
-----------------------------------------------------------------------------------------
ksql> PRINT 'api-logs' ;

Format:JSON
{"ROWTIME":1537726054423,"ROWKEY":"null","response_code":401,"username":"trouble_maker"}
{"ROWTIME":1537726079008,"ROWKEY":"null","response_code":401,"username":"trouble_maker"}
{"ROWTIME":1537726044944,"ROWKEY":"null","response_code":401,"username":"trouble_maker"}
{"ROWTIME":1537726478848,"ROWKEY":"null","response_code":200,"username":"trouble_maker"}
{"ROWTIME":1537726073925,"ROWKEY":"null","response_code":200,"username":"good_user"}
...

Queries

Queries can represent Kafka data as either:

  • Streams
  • Tables

Stream

.....................

campaign_1  => created
campaign_2  => created
campaign_1  => sent
campaign_2  => updated
campaign_2  => sent

Table

.....................

campaign_1  => sent
campaign_2  => sent

Streams

ksql> CREATE STREAM api_logs (
        response_code INT,
        username VARCHAR
      ) 
      WITH (kafka_topic='api-logs', value_format='JSON');


 Message
----------------
 Stream created
----------------

Describe streams

ksql> SHOW STREAMS ;

 Stream Name | Kafka Topic | Format
------------------------------------
 API_LOGS    | api-logs    | JSON
------------------------------------



ksql> describe API_LOGS ;

Name                 : API_LOGS
 Field         | Type
-------------------------------------------
 ROWTIME       | BIGINT           (system)
 ROWKEY        | VARCHAR(STRING)  (system)
 RESPONSE_CODE | INTEGER
 USERNAME      | VARCHAR(STRING)

Describe extended

ksql> DESCRIBE EXTENDED API_LOGS ;

Name                 : API_LOGS
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : api-logs (partitions: 4, replication: 1)

 Field         | Type
-------------------------------------------
 ROWTIME       | BIGINT           (system)
 ROWKEY        | VARCHAR(STRING)  (system)
 RESPONSE_CODE | INTEGER
 USERNAME      | VARCHAR(STRING)
-------------------------------------------

Local runtime statistics
------------------------
messages-per-sec:     14.38
total-messages:       1208     
last-message:         09/24/18 2:10:12 PM GMT
failed-messages:      0      
last-failed:          n/a

(Statistics of the local Ksql Server interaction with the Kafka topic API_LOGS)

The topic is registered now, which means it can be queried

ksql> show topics ;

 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------
 api-logs    | true       | 4          | 1                  | 0         | 0
 locations   | false      | 4          | 1                  | 0         | 0
-----------------------------------------------------------------------------------------

Select

ksql> SELECT username, response_code 
      FROM API_LOGS ;


# output
trouble_maker | 401
trouble_maker | 401
good_user | 200

# query continues to run.
# output after 5s
mitch | 200
good_user | 200

Continuous

Where does the query output go?

It depends...

Queries can be either:

  • Persistent
  • Non-persistent

Persistent

  • Results are written back to Kafka
  • Connect, Streams, ... can read the output and do something with it

Non-persistent

  • Still continuously streaming queries
  • Results aren't written to an output topic

Non-persistent

ksql> SELECT username, response_code 
      FROM API_LOGS ;

Persistent

ksql> CREATE STREAM response_codes_only AS SELECT response_code FROM API_LOGS ;


ksql> show topics ;

 Kafka Topic         | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-------------------------------------------------------------------------------------------------
 api-logs            | true       | 4          | 1                  | 4         | 1
 locations           | false      | 4          | 1                  | 0         | 0
 RESPONSE_CODES_ONLY | true       | 4          | 1                  | 0         | 0
------------------------------------------------------------------------------------------------

Filtering

ksql> SELECT username, response_code 
      FROM API_LOGS 
      WHERE response_code = 401;


# output
trouble_maker | 401
trouble_maker | 401
trouble_maker | 401

# output after 20s
trouble_maker | 401
chaos_monkey | 401

Aggregations

ksql> SELECT username, count(*) 
      FROM API_LOGS 
      GROUP BY username;


# output
good_user | 1
trouble_maker | 2

Windowing

ksql> SELECT username, count(*) 
      FROM API_LOGS 
      WINDOW TUMBLING (SIZE 1 MINUTE) 
      GROUP BY username;


# output
trouble_maker | 2
trouble_maker | 1
good_user | 1

3 types of windows

  • Hopping
  • Tumbling
  • Session

Hopping window

WINDOW HOPPING (SIZE 20 SECONDS, ADVANCE BY 5 SECONDS)

Tumbling window

WINDOW TUMBLING (SIZE 20 SECONDS)

Session windows

WINDOW SESSION (300 second)

  • Records with the same key are grouped into sessions
  • Sessions expire after a configurable time (e.g. 300 seconds) if no data for the key is seen
  • New records arriving after expired session will be grouped into a new session

Time

  • Processing time
  • Event time

Processing time

// use the timestamp in the message

Timestamp  => int64
Key        => bytes
Value      => bytes

Event time processing

ksql> CREATE STREAM api_logs (
        response_code INT,
        username VARCHAR,
        request_time LONG
      ) 
      WITH (KAFKA_TOPIC='api-logs',
            VALUE_FORMAT='JSON',
            TIMESTAMP='request_time');

Tables

ksql> CREATE TABLE locations (
        username VARCHAR, 
        city VARCHAR
      ) WITH (kafka_topic='locations', key='username', value_format='JSON') ;

 Message
---------------
 Table created
---------------

Tables

ksql> show tables ;

 Table Name     | Kafka Topic    | Format | Windowed
-----------------------------------------------------
 LAST_LOCATIONS | last-locations | JSON   | false
-----------------------------------------------------


ksql> describe LAST_LOCATIONS ;

Name                 : LAST_LOCATIONS
 Field    | Type
--------------------------------------
 ROWTIME  | BIGINT           (system)
 ROWKEY   | VARCHAR(STRING)  (system)
 USERNAME | VARCHAR(STRING)
 CITY     | VARCHAR(STRING)
--------------------------------------

Joins

  • Can be used for combining / enriching data
  • [
    LEFT
    |
    FULL
    |
    INNER
    ]
    JOIN

Join example

ksql> CREATE STREAM api_logs_enriched AS 
      SELECT api_logs.username, api_logs.response_code, locations.city
      FROM api_logs
      LEFT JOIN locations on api_logs.username = locations.username;



 Message
----------------------------
 Stream created and running
----------------------------
ksql> SELECT *
      FROM api_logs_enriched ;

# output
1537747980497 | trouble_maker | trouble_maker | 401 | Atlanta
1537748149782 | good_user | good_user | 401 | null

Functions

ksql> show functions ;

 Function Name           | Type
-------------------------------------
 ABS                     | SCALAR
 ARRAYCONTAINS           | SCALAR
 CEIL                    | SCALAR
 CONCAT                  | SCALAR
 COUNT                   | AGGREGATE
 EXTRACTJSONFIELD        | SCALAR
 FETCH_FIELD_FROM_STRUCT | SCALAR
 FLOOR                   | SCALAR
 GEO_DISTANCE            | SCALAR
 IFNULL                  | SCALAR
 LCASE                   | SCALAR
 LEN                     | SCALAR
 MASK                    | SCALAR
 MASK_KEEP_LEFT          | SCALAR
 MASK_KEEP_RIGHT         | SCALAR
 MASK_LEFT               | SCALAR
 MASK_RIGHT              | SCALAR
 MAX                     | AGGREGATE
 MIN                     | AGGREGATE
 RANDOM                  | SCALAR
 ROUND                   | SCALAR
 STRINGTOTIMESTAMP       | SCALAR
 SUBSTRING               | SCALAR
 SUM                     | AGGREGATE
 TIMESTAMPTOSTRING       | SCALAR
 TOPK                    | AGGREGATE
 TOPKDISTINCT            | AGGREGATE
 TRIM                    | SCALAR
 UCASE                   | SCALAR
-------------------------------------

Functions

ksql> describe function UCASE ;

Name        : UCASE
Author      : confluent
Version     :
Overview    :
Type        : scalar
Jar         : internal
Variations  :

	Arguments   : VARCHAR
	Returns     : VARCHAR
	Description :

UDFs and UDAFs

  • User defined functions and User defined aggregate functions
  • Build your own custom functions for use in KSQL queries
  • Deployed as JARs inside KSQL container

Custom functions

@UdfDescription(name = "synthwave_name", description = "get your synthwave name")
public class SynthwaveName {

  @Udf(description = "get your synthwave name")
  public String synthwaveName(String name) {
    // do something
    return "";
  }
}
ksql> SELECT name, synthwave_name(name)
      FROM users;

# output
anthony | synth neo
coty | sunset laser
erin | force grid
garret | drive synth
kale | night synth
kasa | night synth
matt | cruise synth
mitch | cruise summer
peony | terror force

Explaining queries

  • View the topology definition

Explain example

ksql> show queries ;

 Query ID                   | Kafka Topic         | Query String
-----------------------------------------------------------------
 CSAS_RESPONSE_CODES_ONLY_1 | RESPONSE_CODES_ONLY | ...
------------------------------------------------------------------


ksql> explain CSAS_RESPONSE_CODES_ONLY_1 ;

...

Processing topology
-------------------
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [api-logs])
      --> KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
      --> KSTREAM-TRANSFORMVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    ...
    Sink: KSTREAM-SINK-0000000005 (topic: RESPONSE_CODES_ONLY)
      <-- KSTREAM-MAPVALUES-0000000004
https://zz85.github.io/kafka-streams-viz/

Scaling

  • partition count
  • ksql.streams.num.stream.threads (kafka streams)
  • spec.replicas (k8s)

Fault tolerance

Consumer Groups

Consumers maintain their membership by

heart beating back to the cluster

#!
                                                              _
      ;#'#'.   ~'#'#;                                        |-|
     #'     `/      #'                                    _  |=|
      #.           .#                                    |-| |_|  _
      '#.         .#'  °º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤  |=|     |-|
        '#.     .#'                                      |_|     |=| 
          '#. .#'                                                |_|
            '#'
#!

-------------------------------------------------------
 ______ 
|.     |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 1
| KSQL |
| '-,  |
|______|°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 2

------------------------------------------------------|
 ______ 
|.     |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 3
| KSQL |
| '-,  |
|______|°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 4

-------------------------------------------------------
#!

-------------------------------------------------------
 ______ 
|.     |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 1
| KSQL |
| '-,  |
|______|°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 2

------------------------------------------------------|
                    _ ._  _ , _ ._
                  (_ ' ( `  )_  .__)
                ( (  (    )   `)  ) _)
              (__ (_   (_ . _) _) ,__)
                  `~~`\ ' . /`~~`
                        ;   ;
                        /   \
          _____________/_ __ \_____________
#!

-------------------------------------------------------
 ______ 
|.     |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 1
| KSQL |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 2
| '-,  |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 3
|______|°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 4

------------------------------------------------------|

                 +-------+
                 | ˥ΌSʞ  |
 ._  _ , _ .`~~  +-------+ /`~~` _ .`~~`\ '_ .`~~_  _

Deploying

Deploying

  • Add your query to the pando-ksql configmap
  • Submit a PR
  • Once it's merged, you're querying will be deployed automatically

Monitoring

  • JMX / Prometheus Exporter
  • akfak
  • Elastalert (errors)

Use cases

Basic

  • Reshaping data
  • Reserializing data from one format to another
  • Topic inspection, sampling, and troubleshooting

Legal

  • Data masking / filtering
  • Branching streams based on message contents or data usage permissions

Other

  • Data enrichment with joins
  • Suspicious user activies (e.g. high API usage)
  • Anonmaly detection in other systems
  • Sentiment analysis of social data using custom UDFs

What now?

# clone the repo
$ git clone git@github.com:magicalpipelines/ksql-helm-chart.git

# deploy locally
$ ./script/build && ./script/deploy

# start the CLI
$ kubectl exec -it <POD_NAME> ksql
#!
                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2018 Confluent Inc.

CLI v5.0.0, Server v5.0.0 located at http://localhost:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

Questions?

Art

  • Title: u/uhhhhhhhhhh42 (Reddit)
  • Explosion: www.asciiart.eu
  • Sun: u/MonkehMaster (Reddit)