Azure Functions Kafka Trigger Performance Tips, Concept, and Architecture

Tsuyoshi Ushio
9 min readDec 8, 2020

--

I frequently asked about the concept of the Azure Functions Kafka Trigger. I create a video and this blog to understand the Kafka Trigger’s concept. Kafka Trigger is one of the most straightforward solutions for Kafka consumer.

Azure/azure-functions-kafka-extension: Kafka extension for Azure Functions (github.com)

Video

TL; DL, You can watch 12 min video to understand the concept and performance tips overview.

Single Mode / Batch Mode

Kafka Extension has two modes. One is Single Mode, and the other is Batch mode. If you want to have one message per one azure function execution, you can use Single Mode. Kafka Lister read messages within a fixed time, and then Functions execute the message one by one.

Single Mode

From the Kafka Extension side, the Single Mode is identified by the method parameter signature. If the KafkaEventData parameter is non-array, the extension defined it as Single Mode.

[FunctionName(nameof(SampleConsumerAsync))]
public async Task SampleConsumerAsync(
[KafkaTrigger(
"BrokerList",
"reproduce2",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
ConsumerGroup = "$Default",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string> kafkaEvent,
ILogger logger){
Batch Mode

Batch Mode execute a function with several messages. Kafka Listener read messages during a fixed time, then a function executed with several messages. The Mode is defined by the method signature that has KafkaEventData<T>[]

[FunctionName(nameof(SampleConsumerAsync1))]
public async Task SampleConsumerAsync1(
[KafkaTrigger(
"BrokerList",
"reproduce2",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
ConsumerGroup = "$Default",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string>[] kafkaEvent,
ILogger logger){

You can define maxBatchSize on your host.json . The parameter indicate 3 messages send to a function at a time.

host.json

{
"version": "2.0",
"extensions": {
"kafka": {
"maxBatchSize": 3,
},
}

If you use Java, you can define the batch mode with cardinality= Cardinality.MANY. If you specify the cardinality, you need to define dataType; usually, it is a string.

@FunctionName("KafkaTrigger-Java-Many")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "message",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "%ConfluentCloudPassword%",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
cardinality = Cardinality.MANY,
dataType = "string"

) String[] kafkaEventData,
final ExecutionContext context) {

For the other languages, you can define it on the function.json
This is a Python example.

import logging
import typing
from azure.functions import KafkaEvent
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
logging.info(event.get_body())

As you can see the, cardinarity and dataType is defined. For the Java implementation, the azure-functions-maven-plugin generate a function.json so that essentially the same.

function.json

{
"scriptFile": "main.py",
"bindings": [
{
"type" : "kafkaTrigger",
"direction": "in",
"name" : "kevents",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"topic" : "message_python",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"dataType": "string",

"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"BrokerList" : "%ConfluentBrokerList%"
}
]
}

Channel

Kafka Extension has a Kafka Listener that consumes messages from the broker. It read messages duringSubscriberInternalInSecond. Kafka Lister doesn’t execute functions. Instead, send messages to the channel. The channel is a buffer between Kafka Listener and Functions. Channel can keep the number of messages that is maxBatchSize * ExecutorChannelCapacity . ExecutorChannelCapacity is one by default. If you increase the value, you can increase the buffer size. Function read the messages from 1 or maxBatchSize according to the Mode, execute a function. Once the channel reaches the limit, Kafka Listener stops consuming. ChannelFullRetryIntervalInMsis the time (ms) to retry to write channel.

Kafka Listener, Channel, and Function

The other important configuration is MaxPollingIntervalMs, which is the maximum interval between fetching messages to commit. If you have a big maxBatchSize with Single Mode, Read a bunch of messages, then run Function One by one; the time the last message reads is way longer than the timeout by default (300000ms — 5 min). In this case, once the timeout happens, the offset commit won’t progress. In this case, MaxPollingIntervalMs will be a good tuning parameter.

{"version": "2.0","extensions": {
"kafka": {
"maxBatchSize": 3,
"SubscriberIntervalInSeconds": 3,
"ExecutorChannelCapacity": 1,
"ChannelFullRetryIntervalInMs": 50,
"MaxPollIntervalMs": 300000
}
}
}

Parallelism / Concurrency

How can we run the function parallel or concurrent? You might increase the throughput. You can consider this concept Topic, Partition , and Consumer Group .

A kafka broker has a Topic is a logical unit that receives messages from a producer and passes it to consumers. That has several partitions. The number of the divisions will be the max number of parallel/concurrent execution of the functions.

For the first example, six partitions are there. The Function App has a worker. One function is defined with a Consumer Group. In this case, Function receives messages from all the partition, and the Function executes one by one — no parallel/concurrent execution.

One Function

You might want to run parallelly; then, you can do Scale-out. I create several workers. Then, Function can run parallelly. The throughput will be double.

Two workers (2 functions)

The maximum throughput will be introduced when you scale-out to 6, which means the same number of the partition. Each Function consumes from one partition. Then Function runs parallelly with 6. The most important thing is the maximum parallel, and concurrent will be the same number as partitions.

Six workers (6 functions)

What if the scale-out exceeds the number of the partition? The 7th function will do nothing. You need to pay extra money. Azure Functions Premium Plan provide Elastic Scale. However, it won’t scare more than the number of the partition.

You might wonder, the six workers is the fastest. However, you might want to save money. If the worker’s CPU is just 5%, however, you want to get good throughput. Then you can define several functions of a worker. That means define several functions with the same consumer group.

In this example, the number of the consumer = function is 6. It is the same as the fastest plan. However, It is two concurrent x 3 parallel. In this case, you can get a pretty good throughput. However, in this case, you need to have a balance between CPU/Memory and Cost.

Three workers (6 functions)

The last example is two consumer groups. If you have two consumer groups, it doesn’t share the offset commit. So that Consumer Group read a message from Partition 0, Consumer Group read the same massage as Consumer Group B. Among the Consumer Group, each function doesn’t read the same message unless something happens. This strategy works for AppService Plan since the Premium Plan’s current scaler optimized for the one partition — 1 worker model .

Two Consumer Groups

Enable Message Key

Kafka message contains Key and Value. However, if you specify the Generic parameter with 1, the Key will be null. If you want to use the message key, you need to set two generic parameters with KafkaEventData like the example.

[FunctionName(nameof(SampleConsumerAsync1))]
public async Task SampleConsumerAsync1(
[KafkaTrigger(
"BrokerList",
"reproduce2",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
ConsumerGroup = "$Default",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string, string>[] kafkaEvent,
ILogger logger){

Exception and Function not completed

When the Functions through an exception, when the worker rebooted, what happens?

Kafka message is read in Kafka listener in advance and passed to the functions. Once function is successfully executed, it will commit the offset commit . If the function is broken, after the recovery, Kafka extension will read the message after the offset commit.

If the reboot happens, that means Kafka Listener read the message; however, before finishing the function, stop execute it. In this case, offset commit is not progress so that Kafka Listener will read the same message again.

That is why the Kafka Trigger functions should be idempotent which means if you execute a function with the same parameter several times, the result is the same. Design your function with idempotent. Kafka’s original concept is, At least Once policy. So that It could happen, we read the same message.

Lastly, Exception throwed from the Function. In this case, offset commit will be progress. Why? In the high throughput environment, If the function doesn’t the offset commit the offset in the high throughput environment, the continuous message is back to the broker and easy to piles up. For more details, this official documentation explains it.

Kafka Extension is designed to be similar to the EventHub extension.

Azure Functions reliable event processing | Microsoft Docs

Offset commit and Retry Policy

Azure Functions support Retry Policy. We can use it for Kafka Extension as well.

Azure Functions error handling and retry guidance | Microsoft Docs

However, what happens for offset commit? If retry happens is it progress? The answer is no. We use Store Offset strategy on the Kafka Extensions.

Kafka .NET Client | Confluent Documentation

The execution step will be like this.

  1. Kafka Extension read the messages from a broker
  2. Try Execute your function, if it throws exception, it will replay according to the Retry Policy. However, just repeat 2. Once success or reach the threshold, go on to step 3.
  3. Execute Store Offset. https://docs.confluent.io/clients-confluent-kafka-dotnet/current/overview.html#store-offsets
  4. Librdkafka will auto-commit in background only the Store Offset is committed.

So that, most of the case, we don’t need to worry about the commit. Commit progress after the all retries has been finished. However, as I explained the last section, Kafka extension progress the commit even if it throws exception, so that you can try-catch with re-throw the exception with the dead-letter strategy. Unfortunately, the retry policy is just execute the function and detect the exception, so that if you implement try-catch with dead-letter, if will create a dead-letter for each retry. So that your dead letter queue processor should be idempotent.

Debug

We use Confluent.kafka library. That library uses librdkafka library. Sometimes, you want to see the detail logs to understand the behavior or troubleshooting. Then You can use LibkafkaDebug you can configure several options. Search debug on this page. That requires logging.logLevel` configuration.

{
"version": "2.0",
"logging": {
"logLevel": {
"default": "Debug"
}
},
"extensions": {
"kafka": {
"maxBatchSize": 2,
"AutoCommitIntervalMs": 200,
"SubscriberIntervalInSeconds": 3,
"ExecutorChannelCapacity": 1,
"ChannelFullRetryIntervalInMs": 50,
"MaxPollIntervalMs": 300000,
"LibkafkaDebug": "fetch"
}
}

Latest Version

I always recommend the latest official version. It always includes a new bug fix and performance improvement on the librdkafka side.

Releases · Azure/azure-functions-kafka-extension (github.com)

More tuning parameter

That’s it. However, there is one more powerful tuning technique. librdkafka provide a bunch of configuration parameter. Not all of them are supported by Kafka Extension; However, if you send a pull request, you might be able to use it.

For example, the Confluent page recommends changing fetch.min.bytes and use a specific compression method.

Optimizing for Throughput — Confluent Documentation

This best practice introduce socket.receive.buffer.bytes to increase. It is for Java, so that I’m not sure these parameters work; however, we can also configure it.

20 Best Practices for Working With Apache Kafka at Scale — DZone Big Data

If you want to add more configuration, send a pull request! It won’t be challenging. This PR includes change. (Sorry, a little complex, however, the change configuration part is not many parts. See KafkaOption’s change.

Upgrade 1.5.2 and improve debugging with avoiding limitation of Azur… · Azure/azure-functions-kafka-extension@d6ddb6c (github.com)

Code Sample for this blog

Kafka Extension Example (github.com)

Conclusion

I introduced the concept, tips, and architecture. I hope it helps to understand it. Performance tuning is a tough part. For example, Generally speaking, Batch Mode is a way better way for throughput; however, a customer said they improve the throughput to make it from Batch mode to Single Mode. If it is fast, it will take 100–200ms; however, it includes REST-API call, it could be 30 sec, sometimes happens. In this case, if you have 100 maxBatchSize, 100 messages are executed; however, once you’ve got 30-sec REST-API, it will delay all 100 messages so that they run single with multiple partitions.
I hope you enjoy the Kafka Extension journey!

--

--

Tsuyoshi Ushio

Senior Software Engineer — Microsoft