Azure Functions Kafka Trigger Performance Tips, Concept, and Architecture

Video

TL; DL, You can watch 12 min video to understand the concept and performance tips overview.

Single Mode / Batch Mode

Kafka Extension has two modes. One is Single Mode, and the other is Batch mode. If you want to have one message per one azure function execution, you can use Single Mode. Kafka Lister read messages within a fixed time, and then Functions execute the message one by one.

Single Mode
[FunctionName(nameof(SampleConsumerAsync))]
public async Task SampleConsumerAsync(
[KafkaTrigger(
"BrokerList",
"reproduce2",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
ConsumerGroup = "$Default",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string> kafkaEvent,
ILogger logger){
Batch Mode
[FunctionName(nameof(SampleConsumerAsync1))]
public async Task SampleConsumerAsync1(
[KafkaTrigger(
"BrokerList",
"reproduce2",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
ConsumerGroup = "$Default",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string>[] kafkaEvent,
ILogger logger){
{
"version": "2.0",
"extensions": {
"kafka": {
"maxBatchSize": 3,
},
}
@FunctionName("KafkaTrigger-Java-Many")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "message",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "%ConfluentCloudPassword%",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
cardinality = Cardinality.MANY,
dataType = "string"

) String[] kafkaEventData,
final ExecutionContext context) {
import logging
import typing
from azure.functions import KafkaEvent
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
{
"scriptFile": "main.py",
"bindings": [
{
"type" : "kafkaTrigger",
"direction": "in",
"name" : "kevents",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"topic" : "message_python",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"dataType": "string",

"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"BrokerList" : "%ConfluentBrokerList%"
}
]
}

Channel

Kafka Extension has a Kafka Listener that consumes messages from the broker. It read messages duringSubscriberInternalInSecond. Kafka Lister doesn’t execute functions. Instead, send messages to the channel. The channel is a buffer between Kafka Listener and Functions. Channel can keep the number of messages that is maxBatchSize * ExecutorChannelCapacity . ExecutorChannelCapacity is one by default. If you increase the value, you can increase the buffer size. Function read the messages from 1 or maxBatchSize according to the Mode, execute a function. Once the channel reaches the limit, Kafka Listener stops consuming. ChannelFullRetryIntervalInMsis the time (ms) to retry to write channel.

Kafka Listener, Channel, and Function
{"version": "2.0","extensions": {
"kafka": {
"maxBatchSize": 3,
"SubscriberIntervalInSeconds": 3,
"ExecutorChannelCapacity": 1,
"ChannelFullRetryIntervalInMs": 50,
"MaxPollIntervalMs": 300000
}
}
}

Parallelism / Concurrency

How can we run the function parallel or concurrent? You might increase the throughput. You can consider this concept Topic, Partition , and Consumer Group .

One Function
Two workers (2 functions)
Six workers (6 functions)
Three workers (6 functions)
Two Consumer Groups

Enable Message Key

Kafka message contains Key and Value. However, if you specify the Generic parameter with 1, the Key will be null. If you want to use the message key, you need to set two generic parameters with KafkaEventData like the example.

[FunctionName(nameof(SampleConsumerAsync1))]
public async Task SampleConsumerAsync1(
[KafkaTrigger(
"BrokerList",
"reproduce2",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
ConsumerGroup = "$Default",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string, string>[] kafkaEvent,
ILogger logger){

Exception and Function not completed

When the Functions through an exception, when the worker rebooted, what happens?

Offset commit and Retry Policy

Azure Functions support Retry Policy. We can use it for Kafka Extension as well.

  1. Kafka Extension read the messages from a broker
  2. Try Execute your function, if it throws exception, it will replay according to the Retry Policy. However, just repeat 2. Once success or reach the threshold, go on to step 3.
  3. Execute Store Offset. https://docs.confluent.io/clients-confluent-kafka-dotnet/current/overview.html#store-offsets
  4. Librdkafka will auto-commit in background only the Store Offset is committed.

Debug

We use Confluent.kafka library. That library uses librdkafka library. Sometimes, you want to see the detail logs to understand the behavior or troubleshooting. Then You can use LibkafkaDebug you can configure several options. Search debug on this page. That requires logging.logLevel` configuration.

{
"version": "2.0",
"logging": {
"logLevel": {
"default": "Debug"
}
},
"extensions": {
"kafka": {
"maxBatchSize": 2,
"AutoCommitIntervalMs": 200,
"SubscriberIntervalInSeconds": 3,
"ExecutorChannelCapacity": 1,
"ChannelFullRetryIntervalInMs": 50,
"MaxPollIntervalMs": 300000,
"LibkafkaDebug": "fetch"
}
}

Latest Version

I always recommend the latest official version. It always includes a new bug fix and performance improvement on the librdkafka side.

More tuning parameter

That’s it. However, there is one more powerful tuning technique. librdkafka provide a bunch of configuration parameter. Not all of them are supported by Kafka Extension; However, if you send a pull request, you might be able to use it.

Code Sample for this blog

Kafka Extension Example (github.com)

Conclusion

I introduced the concept, tips, and architecture. I hope it helps to understand it. Performance tuning is a tough part. For example, Generally speaking, Batch Mode is a way better way for throughput; however, a customer said they improve the throughput to make it from Batch mode to Single Mode. If it is fast, it will take 100–200ms; however, it includes REST-API call, it could be 30 sec, sometimes happens. In this case, if you have 100 maxBatchSize, 100 messages are executed; however, once you’ve got 30-sec REST-API, it will delay all 100 messages so that they run single with multiple partitions.
I hope you enjoy the Kafka Extension journey!

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store