The Exciting Frontier of
Custom KSQL Functions

Hi, I'm Mitch

  • Data Systems Engineer @Mailchimp
  • Working with Kafka ~3 years
  • github.com/mitch-seymour
  • new dad   ❤   thai food, retrowave

Mailchimp

Agenda


  • Motivation
  • Terminology / Basic UDF example
  • Remote services / models
  • Embedded models
  • Polyglot UDF experiment
  • Summary

Motivation

Why are custom KSQL functions important?

KSQL allows us to operate in higher levels of abstraction

https://docs.confluent.io/current/ksql/docs/concepts/ksql-and-kafka-streams.html

KSQL functions keep us in this higher level of abstraction

when we need custom business logic

But this talk isn't called

The Important Frontier of Custom KSQL Functions

So, what's exciting about

Custom KSQL Functions

Current technological landscape

KSQL functions are a vehicle

that allow us to take advantage of these technologies

Terminology

UDFs


  • User-defined functions
  • Operate on a single row
  • Stateless

UDAFs


  • User-defined aggregate functions
  • Multiple inputs, one output (aggregation)
  • Stateful

Example I

Basic functions


Concepts

  • Building
  • Deploying

Maven Archetype

upcoming*

mvn archetype:generate -X \
    -DarchetypeGroupId=io.confluent.ksql \
    -DarchetypeArtifactId=ksql-udf-quickstart \
    -DarchetypeVersion=5.2.1 \
    -DgroupId=com.mitchseymour \
    -DartifactId=example \
    -Dversion=0.1.0-SNAPSHOT

Reviewer note: I contributed this archetype to KSQL but it may not be published by conference time. If that's the case, I'll replace this with an unofficial archetype in Maven Central and tell users to expect the official archetype in the next Confluent Platform release

Maven Archetype

Generated files

.
└── example
    ├── pom.xml
    └── src
        ├── main
        │   ├── java
        │   │   └── com
        │   │       └── mitchseymour
        │   │           ├── ReverseUdf.java
        │   │           └── SummaryStatsUdaf.java
        │   └── resources
        └── test
            └── java
                └── com
                    └── mitchseymour
                        ├── ReverseUdfTests.java
                        └── SummaryStatsUdafTests.java

Reverse UDF

Business logic

public class ReverseUdf {

  public String reverseString(final String source) {
    return new StringBuilder(source).reverse().toString();
  }
}

Reverse UDF

Annotations

Deploy

  • Package as uber JAR
    mvn package
  • Copy to extensions directory
    cp target/udf.jar /path/to/ksql/ext/
  • Restart KSQL server
    ksql-server-stop && ksql-server-start

Describe

ksql> DESCRIBE FUNCTION reverse ;

Name        : REVERSE
Author      : Mitch
Version     : 0.1.0
Overview    : Example UDF that reverses an object
Type        : scalar
Jar         : /development/ksql/ext/example-0.1.0.jar
Variations  :

	Variation   : REVERSE(source VARCHAR)
	Returns     : VARCHAR
	Description : Reverse a string
	source      : the value to reverse

Invoke UDF

ksql> SELECT user, reverse(user) FROM api_logs ;
mitch | hctim
elyse | esyle
izzy | yzzi

What about UDAF s ?

Summary Stats UDAF

Override these methods

Summary Stats UDAF

initialize

public Map<String, Double> initialize() {
  final Map<String, Double> stats = new HashMap<>();
  stats.put("mean", 0.0);
  stats.put("sample_size", 0.0);
  stats.put("sum", 0.0);
  return stats;
}

Summary Stats UDAF

aggregate

public Map<String, Double> aggregate(
    final Double newValue,
    final Map<String, Double> aggregateValue
) {
  // calculate the new sample size
  final Double sampleSize = 1.0 + aggregateValue.get("sample_size");

  // calculate the new sum
  final Double sum = newValue + aggregateValue.get("sum");

  // build the new aggregate
  aggregateValue.put("mean", sum / sampleSize);
  aggregateValue.put("sample_size", sampleSize);
  aggregateValue.put("sum", sum);
  return aggregateValue;
}

Summary Stats UDAF

merge

public Map<String, Double> merge(
    final Map<String, Double> aggOne,
    final Map<String, Double> aggTwo
) {
  // calculate the combined sample size
  final Double sampleSize = aggOne.get("sample_size") + aggTwo.get("sample_size");
  
  // calculate the combined sum
  final Double sum = aggOne.get("sum") + aggTwo.get("sum");

  // build the new aggregate
  final Map<String, Double> newAggregate = new HashMap<>();
  newAggregate.put("mean", sum / sampleSize);
  newAggregate.put("sample_size", sampleSize);
  newAggregate.put("sum", sum);
  return newAggregate;
}

Build and Deploy

(same as before)

Invoke UDAF

ksql> SELECT endpoint, summary_stats(response_time_ms) 
      FROM api_logs
      GROUP BY endpoint;

api/v2/items | {sample_size=1.0, mean=0.572, sum=0.572}
api/v2/items | {sample_size=2.0, mean=0.572, sum=1.144}
api/v2/items | {sample_size=3.0, mean=0.672, sum=2.016}

Basics  

Example II

Sentiment Analysis


Concepts

  • Remote services
  • Third party dependencies

Sentiment Analysis


  • Product reception
  • Outage impact
  • Audience engagement
  • Abusive content moderation

Sentiment during website outage

Natural Language API

UDFs can have third party dependencies

<dependencies>
    <!-- KSQL dependencies -->
    <dependency>
        <groupId>io.confluent.ksql</groupId>
        <artifactId>ksql-udf</artifactId>
        <version>${ksql.version}</version>
    </dependency>
    <!-- Third party -->
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-language</artifactId>
        <version>1.67.0</version>
    </dependency>
</dependencies>

Implementation

// get an API client. full implementation in git repo
static LanguageServiceClient apiClient = getClient();


@Udf(description = "Detect the sentiment of a string of text")
public Map<String, Double> getSentiment(
  @UdfParameter(value = "text", description = "the text to analyze")
  final String text) {
  
  final Document doc = Document.newBuilder()
      .setContent(text)
      .setType(Type.PLAIN_TEXT)
      .build();

  // Detects the sentiment of the text
  final Sentiment sentiment = apiClient.analyzeSentiment(doc).getDocumentSentiment();
  
  // Build the result object
  final Map<String, Double> result = new HashMap<>();
  result.put("score", Double.valueOf(sentiment.getScore()));
  result.put("magnitude", Double.valueOf(sentiment.getMagnitude()));
  return result;
}

Authentication

export GOOGLE_APPLICATION_CREDENTIALS=/path/to/key.json

Some libraries offer envrionment-based configuration. Is there a better way? We'll see later!

Usage

ksql> SELECT sentiment(tweet_text) FROM tweets ;

# tweet_text == "I love pizza"
{score=0.8999999761581421, magnitude=0.8999999761581421}

https://search.maven.org/artifact/com.mitchseymour/ksql-udf-sentiment-analysis

Let's build on this example

Example III

Coversational interfaces


Concepts

  • Configurability
  • Exceptions

Dialogflow

"Organizations report a reduction of up to 70 percent in call, chat and/or email inquiries after implementing a VCA" - Gartner research
  • Chat bots
  • Virtual assistants
  • Improved customer service

Inputs sourced from user

"I would like to book a room" - user123

Responses generated by Dialogflow via KSQL

"I can help with that. Where would you like to reserve a room?"

hybrid training


  • Pre-trained ML models
  • User can also provide training data

Implementation :(

Problems


  • Sentiment and Dialogflow UDFs are now both pulling creds from env
  • Exception handling not great

Configurable UDF :)

Error flows

.
                    _ ._  _ , _ ._
                  (_ ' ( `  )_  .__)
                ( (  (    )   `)  ) _)
              (__ (_   (_ . _) _) ,__)
                  `~~`\ ' . /`~~`
                        ;   ;
                        /   \
          _____________/_ __ \_____________         .
  • Fail fast
  • Fail silently
  • Dead letters

Usage

ksql> SELECT DIALOGFLOW(text, sessionId) FROM SOME_STREAM ;


# sample input:
DIALOGFLOW('I would like to book a room', 'user2')

# sample output:
'I can help with that. Where would you like to reserve a room?'

https://search.maven.org/artifact/com.mitchseymour/ksql-udf-dialogflow

How do we safely improve the model overtime?

In event-driven architectures, this is easy

"By storing only the events and never the commands, we have a wealth of capability that not only allows the system to be refined, extended and proven but also supports evolutionary change" - Neil Avery
https://www.confluent.io/blog/journey-to-event-driven-part-1-why-event-first-thinking-changes-everything

Neil Avery

  • Implement new UDF with updated models
  • Run against the same event stream
  • Evaluate performance
  • Update original model when ready

[diagram of UDF evolution in progress]

Remote services  

Example IV

Spam detection


Concepts

  • Embedded models
  • Evolutionary UDFs
  • hid billions of dollars in debt from investors through accounting fraud
  • emails made public by the Federal Energy Regulatory Commission
  • let's build a spam detector

  • training models is easy
  • models can be exported to Java classes

Let's see how easy it is to build and export a model with h2o

Embed the model

Load the model into the UDF

Run model in UDF

@Udf(description = "Predict ham or spam")
public String predict(String text) {
    List<String> words = getWords(text);

    // add the features to the row
    RowData row = new RowData();
    for (String feature : features) {
        if (wordsFiltered.contains(feature)) {
            final double current = (double) row.getOrDefault(feature, 0.0);
            row.put(feature, current+1);
            continue;
        }
        row.put(feature, 0.0);
    }

    // make a prediction
    BinomialModelPrediction p = model.predictBinomial(row);
    return p.label;
}

Usage

ksql> SELECT PREDICT_SPAM(text), text FROM emails ;

# sample output:
"ham", "hey, how is it going? its been awhile... hope you are doing okay"
"spam", "click here to improve your well being today. your one stop prescription shop!"

https://github.com/magicalpipelines/ksql-functions/tree/master/udf/h2o-spam-prediction

Remote vs Embedded

Remote


  • −   Higher latency
  • −   Less predictable failure scenarios
  • −   No offline support
  • +   Simple integration
  • +   Built-in model management

Checkout Kai's anomaly detection UDF for another h2o example

https://github.com/kaiwaehner/ksql-udf-deep-learning-mqtt-iot

Embedded models  

Example V

Ruby UDF


Concepts

  • Multilingual UDFs
  • Unit testing
  • Polyglot programming
  • Democratize UDF development for non-Java developers
  • Enable rapid prototyping of new data transformation logic
  • This is experimental

Terminology

  • Polyglot
  • Host language
  • Guest language

Installing guest languages

Graal updater (gu)

$ gu install ruby

$ gu available

ComponentId              Version             Component name
----------------------------------------------------------------
python                   1.0.0-rc15          Graal.Python
R                        1.0.0-rc15          FastR
ruby                     1.0.0-rc15          TruffleRuby

Now, let's create a Polyglot UDF!

Color to hex UDF

Unit tests


  • You should test your UDFs
  • Pluggable nature of UDFs makes this easy

Unit tests

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
public class ColorToHexUdfTests {

  @ParameterizedTest(name = "color_to_hex({0})= {1}")
  @CsvSource({
    "deepskyblue, #00bfff",
    "peachpuff, #ffdab9",
    "ghostwhite, #f8f8ff",
    "seagreen, #2e8b57"
  })
  void run(String color, String hex) {
    final ColorToHexUdf udf = new ColorToHexUdf();
    assertEquals(hex, udf.run(color));
  }
}

Usage

ksql> SELECT color, color_to_hex(color) FROM SOME_STREAM ;

# output
mintcream | #f5fffa
peachpuff | #ffdab9
seagreen | #2e8b57
ghostwhite | #f8f8ff

https://github.com/magicalpipelines/ksql-functions/tree/master/udf/color-to-hex

Gotchas


  • Need benchmarks. Initial tests show a start up penalty for some languages
  • Using libs in guest languages may not always work
  • Encountered silent and hard-to-debug failures

What about Security?

Java-based UDFs use a Security manager

// checks for System.exit calls
try {
    ExtensionSecurityManager.INSTANCE.pushInUdf();
    return udf.eval(actualUdf, args);
} finally {
    ExtensionSecurityManager.INSTANCE.popOutUdf();
}

GraalVM has it's own safeguards

Context.Builder builder = Context.newBuilder(new String[]{"python", "ruby", "js"})
    .allowIO(true)
    .allowHostAccess(true)
    .allowNativeAccess(true);

Possible for full integration into KSQL?

I built a Proof of Concept  (POC)

https://github.com/magicalpipelines/docker-ksql-multilingual-udfs-poc

POC


  • Multilingual UDFs in interactive mode
  • Experimental KSQL language extensions

First, run the POC Docker images.

Prerequisites: https://github.com/magicalpipelines/docker-ksql-multilingual-udfs-poc
# start a KSQL server instance in tab 1
$ docker run --net=host \
    -e BOOTSTRAP_SERVERS=localhost:9092 \
    -ti magicalpipelines/ksql-multilingual-udfs:latest

# start a KSQL CLI client in tab 2
$ docker run --net=host \
   -ti magicalpipelines/ksql-multilingual-udfs:latest \
   ksql

Javascript   POC only

CREATE OR REPLACE FUNCTION STATUS_MAJOR(status_code INT) 
RETURNS VARCHAR
LANGUAGE JAVASCRIPT AS $$
    (code) => code.toString().charAt(0) + 'xx'
$$ ;


# output
SELECT endpoint, status_code, STATUS_MAJOR(status_code)
FROM api_logs ;

about.html | 200 | 2xx
index.html | 200 | 2xx
contact.php | 404 | 4xx
https://github.com/magicalpipelines/docker-ksql-multilingual-udfs-poc

Python   POC only

CREATE OR REPLACE FUNCTION ENDPOINT_TYPE(endpoint VARCHAR) 
RETURNS VARCHAR
LANGUAGE PYTHON AS $$
    lambda endpoint: endpoint.split(".")[1]
$$ 


# output
SELECT endpoint, endpoint_type(endpoint)
FROM api_logs ;

about.html | html
index.html | html
contact.php | php
https://github.com/magicalpipelines/docker-ksql-multilingual-udfs-poc

Ruby   POC only

CREATE OR REPLACE FUNCTION REVERSE(endpoint VARCHAR) 
RETURNS VARCHAR
LANGUAGE RUBY AS $$
    lambda { |x| x.reverse }
$$

# output
SELECT endpoint, reverse(endpoint)
FROM api_logs ;

about.html | lmth.tuoba
index.html | lmth.xedni
contact.php | php.tcatnoc
https://github.com/magicalpipelines/docker-ksql-multilingual-udfs-poc

Recap

What did we learn through these examples?


  • Bootstrapping new projects
  • Building
  • Deploying
  • Configuring
  • Error handling
  • Testing

Reviewer note: I may add something about monitoring UDFs as well. WDYT?

Now what?

Go build something exciting

Thanks

[todo]

Questions?