A very quick
&
Data is stored in topics
Topics are split into partitions
#!
Topic
_____________________________________
Partition 1 | | | | | | | | | | | |
_____________________________________
_____________________________________
Partition 2 | | | | | | | | | | | |
_____________________________________
&
Producer
| _______ Consumer 1
| /
| ________________________________________ __ /
---> | | | | | | | | | | | |
________________________________________ <---------- Consumer 2
\
\
\ _______ Consumer 3
// simplified
Timestamp => int64
Key => bytes
Value => bytes
Kafka usage @
examples of
Download Confluent platform
https://docs.confluent.io/current/ksql/docs/installation/installing.html
Magical Pipelines Helm Chart
ksql> show properties ;
Property | Value
------------------------------------------------------------------------------------------
ksql.extension.dir | ext
ksql.output.topic.name.prefix |
ksql.persistent.prefix | query_
ksql.schema.registry.url | http://localhost:8081
ksql.service.id | dev
ksql.sink.partitions | 4
ksql.sink.replicas | 1
ksql.sink.window.change.log.additional.retention | 1000000
ksql.statestore.suffix | _ksql_statestore
ksql.streams.application.id | KSQL_REST_SERVER_DEFAULT_APP_ID
ksql.streams.auto.offset.reset | earliest
ksql.streams.bootstrap.servers | localhost:9092
ksql.streams.cache.max.bytes.buffering | 10000000
ksql.streams.commit.interval.ms | 2000
...
------------------------------------------------------------------------------------------
ksql> SET 'ksql.udfs.enabled' = 'false';
ksql> SET 'auto.offset.reset' = 'latest';
ksql> SHOW TOPICS ;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------------
_schemas | false | 1 | 1 | 0 | 0
console_purchases | true | 4 | 1 | 0 | 0
game_purchases | true | 4 | 1 | 0 | 0
users | false | 4 | 1 | 0 | 0
-----------------------------------------------------------------------------------------------
ksql> PRINT 'game_purchases' LIMIT 1 ;
Format:AVRO
...
# game_purchases
$ ksql-datagen \
schema=game_purchase.avro \
format=avro \
topic=game_purchases \
key=user_id \
maxInterval=5000 \
iterations=10000 &>/dev/null &
# game_consoles
$ ksql-datagen \
schema=console_purchase.avro \
format=avro \
topic=console_purchases \
key=user_id \
maxInterval=5000 \
iterations=10000 &>/dev/null &
The console producer is pretty handy for
pre-populating tables
$ kafka-console-producer \
--broker-list localhost:9092 \
--property 'parse.key=true' \
--property 'key.separator=:' \
--topic users < /etc/ksql/project/users.json
RUN SCRIPTto execute the SQL file
ksql> RUN SCRIPT 'store.sql' ;
- A stream in Kafka records the full history of world (or business) events from the beginning of time to today.
- A table in Kafka is the state of the world today
Michael Knoll
.....................
campaign_1 => created
campaign_2 => created
campaign_1 => sent
campaign_2 => updated
campaign_2 => sent
.....................
campaign_1 => sent
campaign_2 => sent
CREATE STREAM game_purchases
WITH (
kafka_topic='game_purchases',
value_format='avro'
);
CREATE STREAM console_purchases
WITH (
kafka_topic='console_purchases',
value_format='avro'
);
ksql> SHOW STREAMS ;
Stream Name | Kafka Topic | Format
------------------------------------------------
CONSOLE_PURCHASES | console_purchases | AVRO
GAME_PURCHASES | game_purchases | AVRO
------------------------------------------------
ksql> DESCRIBE game_purchases ;
Name : GAME_PURCHASES
Field | Type
-------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
USER_ID | VARCHAR(STRING)
PURCHASE_ID | VARCHAR(STRING)
PURCHASE_TIME | BIGINT
PRODUCT | VARCHAR(STRING)
PRODUCT_TYPE | VARCHAR(STRING)
CREDIT_CARD | VARCHAR(STRING)
-------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
SELECT user_id, purchase_id, product, product_type
FROM game_purchases;
# output
user_1000 | purchase_83 | Starfox 64 | game
user_1000 | purchase_37 | Goldeneye 007 | game
user_1000 | purchase_11 | Wave Race | game
# query continues to run.
user_4000 | purchase_93 | Starfox 64 | game
It depends...
Queries can be either:
Non-persistent
ksql> SELECT *
FROM game_purchases ;
Persistent
ksql> CREATE STREAM game_names AS
SELECT product as game_name
FROM game_purchases ;
ksql> SHOW TOPICS ;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------------
_schemas | false | 1 | 1 | 0 | 0
console_purchases | true | 4 | 1 | 0 | 0
GAME_NAMES | true | 4 | 1 | 0 | 0
game_purchases | true | 4 | 1 | 8 | 2
users | false | 4 | 1 | 0 | 0
-----------------------------------------------------------------------------------------------
CREATE STREAM all_purchases
WITH (
kafka_topic='purchases',
value_format='avro'
) AS
SELECT * FROM game_purchases ;
INSERT INTO all_purchases SELECT * FROM console_purchases;
ksql> SELECT product
FROM console_purchases
WHERE user_id = 'user_2000'
LIMIT 2;
Nintendo 64
Sega Game Gear
CREATE TABLE users(
user_id VARCHAR,
first_name VARCHAR,
last_name VARCHAR,
email VARCHAR,
address STRUCT<street VARCHAR, city VARCHAR, state VARCHAR, zip INT>)
WITH (
kafka_topic='users',
value_format='json',
key='user_id'
);
SHOW TABLES ;
Table Name | Kafka Topic | Format | Windowed
----------------------------------------------
USERS | users | JSON | false
----------------------------------------------
LEFT|
FULL|
INNER]
JOIN
SELECT
u.user_id, u.first_name,
u.last_name,
p.purchase_id,
p.product,
p.product_type,
p.credit_card
FROM all_purchases p
LEFT JOIN users u
ON p.user_id = u.user_id
LIMIT 3;
user_1000 | Arnold | Jones | purchase_83 | Starfox 64 | game | 2237-7528-7672-6519
user_1000 | Arnold | Jones | purchase_37 | Goldeneye 007 | game | 3349-7292-7388-6154
user_1000 | Arnold | Jones | purchase_11 | Wave Race | game | 5326-3985-1774-2293
ksql> SELECT user_id, COUNT(*) as total_purchases
FROM console_purchases
GROUP BY user_id;
# output
user_1000 | 1
user_4000 | 5
user_3000 | 2
user_5000 | 2
user_2000 | 1
CREATE STREAM purchases_formatted
WITH (
kafka_topic='purchases',
value_format='avro'
) AS
SELECT
u.user_id as user_id,
u.first_name || ' ' || u.last_name as full_name,
p.purchase_id as purchase_id,
p.product as product,
p.product_type as product_type,
MASK_LEFT(p.credit_card, 15, 'x', 'x', 'x', '-') as masked_credit_card
FROM all_purchases p
LEFT JOIN users u
ON p.user_id = u.user_id;
ksql> CREATE TABLE suspicious_transactions
WITH (
kafka_topic='suspicious_transactions',
value_format='avro'
) AS
SELECT
p.user_id,
p.masked_credit_card,
COUNT() as total_purchases
FROM purchases_formatted p
WINDOW TUMBLING (SIZE 10 SECONDS)
GROUP BY
p.user_id,
p.masked_credit_card
HAVING COUNT() > 3
LIMIT 1;
WINDOW HOPPING (SIZE 20 SECONDS, ADVANCE BY 5 SECONDS)
WINDOW TUMBLING (SIZE 20 SECONDS)
WINDOW SESSION (300 second)
// use the timestamp in the message
Timestamp => int64
Key => bytes
Value => bytes
ksql> CREATE STREAM api_logs (
response_code INT,
username VARCHAR,
request_time LONG
)
WITH (KAFKA_TOPIC='api-logs',
VALUE_FORMAT='JSON',
TIMESTAMP='request_time');
ksql> show queries ;
Query ID | Kafka Topic | Query String
-----------------------------------------------------------------
CSAS_RESPONSE_CODES_ONLY_1 | RESPONSE_CODES_ONLY | ...
------------------------------------------------------------------
ksql> explain CSAS_RESPONSE_CODES_ONLY_1 ;
...
Processing topology
-------------------
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [api-logs])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> KSTREAM-TRANSFORMVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
...
Sink: KSTREAM-SINK-0000000005 (topic: RESPONSE_CODES_ONLY)
<-- KSTREAM-MAPVALUES-0000000004
ksql> show functions ;
Function Name | Type
-------------------------------------
ABS | SCALAR
ARRAYCONTAINS | SCALAR
CEIL | SCALAR
CONCAT | SCALAR
COUNT | AGGREGATE
EXTRACTJSONFIELD | SCALAR
FETCH_FIELD_FROM_STRUCT | SCALAR
FLOOR | SCALAR
GEO_DISTANCE | SCALAR
IFNULL | SCALAR
LCASE | SCALAR
LEN | SCALAR
MASK | SCALAR
MASK_KEEP_LEFT | SCALAR
MASK_KEEP_RIGHT | SCALAR
MASK_LEFT | SCALAR
MASK_RIGHT | SCALAR
MAX | AGGREGATE
MIN | AGGREGATE
RANDOM | SCALAR
ROUND | SCALAR
STRINGTOTIMESTAMP | SCALAR
SUBSTRING | SCALAR
SUM | AGGREGATE
TIMESTAMPTOSTRING | SCALAR
TOPK | AGGREGATE
TOPKDISTINCT | AGGREGATE
TRIM | SCALAR
UCASE | SCALAR
-------------------------------------
ksql> describe function UCASE ;
Name : UCASE
Author : confluent
Version :
Overview :
Type : scalar
Jar : internal
Variations :
Arguments : VARCHAR
Returns : VARCHAR
Description :
@UdfDescription(name = "synthwave_name", description = "get your synthwave name")
public class SynthwaveName {
@Udf(description = "get your synthwave name")
public String synthwaveName(String name) {
// do something
return "";
}
}
ksql> SELECT name, synthwave_name(name)
FROM users;
# output
anthony | synth neo
coty | sunset laser
erin | force grid
garret | drive synth
kale | night synth
kasa | night synth
matt | cruise synth
mitch | cruise summer
peony | terror force
Consumers maintain their membership by
heart beating back to the cluster
#!
_
;#'#'. ~'#'#; |-|
#' `/ #' _ |=|
#. .# |-| |_| _
'#. .#' °º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ |=| |-|
'#. .#' |_| |=|
'#. .#' |_|
'#'
#!
-------------------------------------------------------
______
|. |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 1
| KSQL |
| '-, |
|______|°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 2
------------------------------------------------------|
______
|. |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 3
| KSQL |
| '-, |
|______|°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 4
-------------------------------------------------------
#!
-------------------------------------------------------
______
|. |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 1
| KSQL |
| '-, |
|______|°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 2
------------------------------------------------------|
_ ._ _ , _ ._
(_ ' ( ` )_ .__)
( ( ( ) `) ) _)
(__ (_ (_ . _) _) ,__)
`~~`\ ' . /`~~`
; ;
/ \
_____________/_ __ \_____________
#!
-------------------------------------------------------
______
|. |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 1
| KSQL |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 2
| '-, |°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 3
|______|°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤ Partition 4
------------------------------------------------------|
+-------+
| ˥ΌSʞ |
._ _ , _ .`~~ +-------+ /`~~` _ .`~~`\ '_ .`~~_ _
# clone the repo
$ git clone git@github.com:magicalpipelines/ksql-helm-chart.git
# deploy locally
$ ./script/build && ./script/deploy
# start the CLI
$ kubectl exec -it <POD_NAME> ksql