Proposed Pull Request Change

title description ms.topic ms.custom ms.date zone_pivot_groups
Apache Kafka trigger for Azure Functions Use Azure Functions to run your code based on events from an Apache Kafka stream. reference devx-track-extended-java, devx-track-js, devx-track-python 12/26/2025 programming-languages-set-functions
📄 Document Links
GitHub View on GitHub Microsoft Learn View on Microsoft Learn
Content Truncation Detected
The generated rewrite appears to be incomplete.
Original lines: -
Output lines: -
Ratio: -
Raw New Markdown
Generating updated version of doc...
Rendered New Markdown
Generating updated version of doc...
+0 -0
+0 -0
--- title: Apache Kafka trigger for Azure Functions description: Use Azure Functions to run your code based on events from an Apache Kafka stream. ms.topic: reference ms.custom: devx-track-extended-java, devx-track-js, devx-track-python ms.date: 12/26/2025 zone_pivot_groups: programming-languages-set-functions --- # Apache Kafka trigger for Azure Functions Use the Apache Kafka trigger in Azure Functions to run your function code in response to messages in Kafka topics. You can also use a [Kafka output binding](functions-bindings-kafka-output.md) to write from your function to a topic. For information on setup and configuration details, see [Apache Kafka bindings for Azure Functions overview](functions-bindings-kafka.md). [!INCLUDE [functions-binding-kafka-plan-support-note](../../includes/functions-binding-kafka-plan-support-note.md)] ## Example ::: zone pivot="programming-language-csharp" The usage of the trigger depends on the C# modality used in your function app, which can be one of the following modes: # [Isolated worker model](#tab/isolated-process) A compiled C# function that uses an [isolated worker process class library](dotnet-isolated-process-guide.md) that runs in a process that's separate from the runtime. # [In-process model](#tab/in-process) [!INCLUDE [functions-in-process-model-retirement-note](../../includes/functions-in-process-model-retirement-note.md)] A compiled C# function that uses an [in-process class library](functions-dotnet-class-library.md) that runs in the same process as the Functions runtime. --- The attributes you use depend on the specific event provider. # [Confluent (in-process)](#tab/confluent/in-process) The following example shows a C# function that reads and logs the Kafka message as a Kafka event: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/Confluent/KafkaTrigger.cs" range="10-21" ::: To receive events in a batch, use an input string or `KafkaEventData` as an array, as shown in the following example: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/Confluent/KafkaTriggerMany.cs" range="10-24" ::: The following function logs the message and headers for the Kafka Event: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/Confluent/KafkaTriggerWithHeaders.cs" range="10-27" ::: You can define a generic [Avro schema] for the event passed to the trigger. The following string value defines the generic Avro schema: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroGenericTriggers.cs" range="23-41" ::: In the following function, an instance of `GenericRecord` is available in the `KafkaEvent.Value` property: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroGenericTriggers.cs" range="43-60" ::: You can define a specific [Avro schema] for the event passed to the trigger. The following code defines the `UserRecord` class: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/User.cs" range="9-32" ::: In the following function, an instance of `UserRecord` is available in the `KafkaEvent.Value` property: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs" range="16-25" ::: For a complete set of working .NET examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/dotnet/). # [Event Hubs (in-process)](#tab/event-hubs/in-process) The following example shows a C# function that reads and logs the Kafka message as a Kafka event: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/EventHub/KafkaTrigger.cs" range="10-21" ::: To receive events in a batch, use a string array or `KafkaEventData` array as input, as shown in the following example: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/EventHub/KafkaTriggerMany.cs" range="10-24" ::: The following function logs the message and headers for the Kafka Event: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/EventHub/KafkaTriggerWithHeaders.cs" range="10-26" ::: You can define a generic [Avro schema] for the event passed to the trigger. The following string value defines the generic Avro schema: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroGenericTriggers.cs" range="23-41" ::: In the following function, an instance of `GenericRecord` is available in the `KafkaEvent.Value` property: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroGenericTriggers.cs" range="43-60" ::: You can define a specific [Avro schema] for the event passed to the trigger. The following code defines the `UserRecord` class: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/User.cs" range="9-32" ::: In the following function, an instance of `UserRecord` is available in the `KafkaEvent.Value` property: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs" range="16-25" ::: For a complete set of working .NET examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/dotnet/). # [Confluent (isolated process)](#tab/confluent/isolated-process) The following example shows a C# function that reads and logs the Kafka message as a Kafka event: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/confluent/KafkaTrigger.cs" range="12-24" ::: To receive events in a batch, use a string array as input, as shown in the following example: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/confluent/KafkaTriggerMany.cs" range="12-27" ::: The following function logs the message and headers for the Kafka Event: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/Confluent/KafkaTriggerWithHeaders.cs" range="12-32" ::: For a complete set of working .NET examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/dotnet-isolated/). # [Event Hubs (isolated process)](#tab/event-hubs/isolated-process) The following example shows a C# function that reads and logs the Kafka message as a Kafka event: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/eventhub/KafkaTrigger.cs" range="12-24" ::: To receive events in a batch, use a string array as input, as shown in the following example: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/eventhub/KafkaTriggerMany.cs" range="12-27" ::: The following function logs the message and headers for the Kafka Event: :::code language="csharp" source="~/azure-functions-kafka-extension/samples/dotnet-isolated/eventhub/KafkaTriggerWithHeaders.cs" range="12-32" ::: For a complete set of working .NET examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/dotnet-isolated/). --- ::: zone-end ::: zone pivot="programming-language-javascript,programming-language-typescript" The usage of the trigger depends on your version of the Node.js programming model. # [Version 4](#tab/v4) In the Node.js v4 model, you define your trigger directly in your function code. For more information, see the [Azure Functions Node.js developer guide](functions-reference-node.md?pivots=nodejs-model-v4). # [Version 3](#tab/v3) In the Node.js v3 model, you define your trigger in a `function.json` file with your code. For more information, see the [Azure Functions Node.js developer guide](functions-reference-node.md?pivots=nodejs-model-v3). --- In these examples, the event providers are either Confluent or Azure Event Hubs. These examples show how to define a Kafka trigger for a function that reads a Kafka message. ::: zone-end ::: zone pivot="programming-language-javascript" # [Confluent](#tab/confluent/v4) ```javascript const { app } = require("@azure/functions"); async function kafkaTrigger(event, context) { context.log("Event Offset: " + event.Offset); context.log("Event Partition: " + event.Partition); context.log("Event Topic: " + event.Topic); context.log("Event Timestamp: " + event.Timestamp); context.log("Event Key: " + event.Key); context.log("Event Value (as string): " + event.Value); let event_obj = JSON.parse(event.Value); context.log("Event Value Object: "); context.log(" Value.registertime: ", event_obj.registertime.toString()); context.log(" Value.userid: ", event_obj.userid); context.log(" Value.regionid: ", event_obj.regionid); context.log(" Value.gender: ", event_obj.gender); } app.generic("Kafkatrigger", { trigger: { type: "kafkaTrigger", direction: "in", name: "event", topic: "topic", brokerList: "%BrokerList%", username: "%ConfluentCloudUserName%", password: "%ConfluentCloudPassword%", consumerGroup: "$Default", protocol: "saslSsl", authenticationMode: "plain", dataType: "string" }, handler: kafkaTrigger, }); ``` # [Event Hubs](#tab/event-hubs/v4) :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript-v4/src/functions/kafkaTrigger.js" range="1-19,21-36"::: # [Confluent](#tab/confluent/v3) This `function.json` file defines the trigger for the Confluent provider: :::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTrigger/function.confluent.json" ::: The following code runs when the function is triggered: :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTrigger/index.js" ::: # [Event Hubs](#tab/event-hubs/v3) This `function.json` file defines the trigger for the Event Hubs provider: :::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTrigger/function.eventhub.json" ::: The following code runs when the function is triggered: :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTrigger/index.js" ::: --- To receive events in a batch, set the `cardinality` value to `many`, as shown in these examples: # [Confluent](#tab/confluent/v4) ```javascript const { app } = require("@azure/functions"); async function kafkaTriggerMany(events, context) { for (const event of events) { context.log("Event Offset: " + event.Offset); context.log("Event Partition: " + event.Partition); context.log("Event Topic: " + event.Topic); context.log("Event Key: " + event.Key); context.log("Event Timestamp: " + event.Timestamp); context.log("Event Value (as string): " + event.Value); let event_obj = JSON.parse(event.Value); context.log("Event Value Object: "); context.log(" Value.registertime: ", event_obj.registertime.toString()); context.log(" Value.userid: ", event_obj.userid); context.log(" Value.regionid: ", event_obj.regionid); context.log(" Value.gender: ", event_obj.gender); } } app.generic("kafkaTriggerMany", { trigger: { type: "kafkaTrigger", direction: "in", name: "event", topic: "topic", brokerList: "%BrokerList%", username: "%ConfluentCloudUserName%", password: "%ConfluentCloudPassword%", consumerGroup: "$Default", protocol: "saslSsl", authenticationMode: "plain", dataType: "string", cardinality: "MANY" }, handler: kafkaTriggerMany, }); ``` # [Event Hubs](#tab/event-hubs/v4) :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript-v4/src/functions/kafkaTriggerMany.js" range="1-21,23-39"::: # [Confluent](#tab/confluent/v3) :::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerMany/function.confluent.json" ::: This code parses the array of events and logs the event data: :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerMany/index.js" ::: This code logs the header data: :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerManyWithHeaders/index.js" ::: # [Event Hubs](#tab/event-hubs/v3) :::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerMany/function.eventhub.json" ::: This code parses the array of events and logs the event data: :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerMany/index.js" ::: This code logs the header data: :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerManyWithHeaders/index.js" ::: --- You can define a generic [Avro schema] for the event passed to the trigger. This example defines the trigger for the specific provider with a generic Avro schema: # [Confluent](#tab/confluent/v4) ```javascript const { app } = require("@azure/functions"); async function kafkaAvroGenericTrigger(event, context) { context.log("Processed kafka event: ", event); if (context.triggerMetadata?.key !== undefined) { context.log("message key: ", context.triggerMetadata?.key); } } app.generic("kafkaAvroGenericTrigger", { trigger: { type: "kafkaTrigger", direction: "in", name: "event", protocol: "SASLSSL", password: "EventHubConnectionString", dataType: "string", topic: "topic", authenticationMode: "PLAIN", avroSchema: '{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}', consumerGroup: "$Default", username: "$ConnectionString", brokerList: "%BrokerList%", }, handler: kafkaAvroGenericTrigger, }); ``` # [Event Hubs](#tab/event-hubs/v4) :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript-v4/src/functions/kafkaAvroGenericTrigger.js" range="1-9,11-28"::: # [Confluent](#tab/confluent/v3) :::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerAvroGeneric/function.confluent.json" ::: The following code runs when the function is triggered: :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerAvroGeneric/index.js" ::: # [Event Hubs](#tab/event-hubs/v3) :::code language="json" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerAvroGeneric/function.eventhub.json" ::: The following code runs when the function is triggered: :::code language="javascript" source="~/azure-functions-kafka-extension/samples/javascript/KafkaTriggerAvroGeneric/index.js" ::: --- # [Version 4](#tab/v4) For a complete set of working JavaScript examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/tree/dev/samples/javascript-v4/src/functions). # [Version 3](#tab/v3) For a complete set of working JavaScript examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/javascript/). --- ::: zone-end ::: zone pivot="programming-language-typescript" # [Confluent](#tab/confluent/v4) ```typescript import { app, InvocationContext } from "@azure/functions"; // This is a sample interface that describes the actual data in your event. interface EventData { registertime: number; userid: string; regionid: string; gender: string; } export async function kafkaTrigger( event: any, context: InvocationContext ): Promise<void> { context.log("Event Offset: " + event.Offset); context.log("Event Partition: " + event.Partition); context.log("Event Topic: " + event.Topic); context.log("Event Timestamp: " + event.Timestamp); context.log("Event Value (as string): " + event.Value); let event_obj: EventData = JSON.parse(event.Value); context.log("Event Value Object: "); context.log(" Value.registertime: ", event_obj.registertime.toString()); context.log(" Value.userid: ", event_obj.userid); context.log(" Value.regionid: ", event_obj.regionid); context.log(" Value.gender: ", event_obj.gender); } app.generic("Kafkatrigger", { trigger: { type: "kafkaTrigger", direction: "in", name: "event", topic: "topic", brokerList: "%BrokerList%", username: "%ConfluentCloudUserName%", password: "%ConfluentCloudPassword%", consumerGroup: "$Default", protocol: "saslSsl", authenticationMode: "plain", dataType: "string" }, handler: kafkaTrigger, }); ``` # [Event Hubs](#tab/event-hubs/v4) :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript-v4/src/functions/kafkaTrigger.ts" range="1-29,31-46"::: # [Confluent](#tab/confluent/v3) This `function.json` file defines the trigger for the Confluent provider: :::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTrigger/function.confluent.json" ::: The following code runs when the function is triggered: :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTrigger/index.ts" ::: # [Event Hubs](#tab/event-hubs/v3) This `function.json` file defines the trigger for the Event Hubs provider: :::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTrigger/function.eventhub.json" ::: The following code runs when the function is triggered: :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTrigger/index.ts" ::: --- To receive events in a batch, set the `cardinality` value to `many`, as shown in these examples: # [Confluent](#tab/confluent/v4) ```typescript import { app, InvocationContext } from "@azure/functions"; // This is a sample interface that describes the actual data in your event. interface EventData { registertime: number; userid: string; regionid: string; gender: string; } interface KafkaEvent { Offset: number; Partition: number; Topic: string; Timestamp: number; Value: string; } export async function kafkaTriggerMany( events: any, context: InvocationContext ): Promise<void> { for (const event of events) { context.log("Event Offset: " + event.Offset); context.log("Event Partition: " + event.Partition); context.log("Event Topic: " + event.Topic); context.log("Event Timestamp: " + event.Timestamp); context.log("Event Value (as string): " + event.Value); let event_obj: EventData = JSON.parse(event.Value); context.log("Event Value Object: "); context.log(" Value.registertime: ", event_obj.registertime.toString()); context.log(" Value.userid: ", event_obj.userid); context.log(" Value.regionid: ", event_obj.regionid); context.log(" Value.gender: ", event_obj.gender); } } app.generic("kafkaTriggerMany", { trigger: { type: "kafkaTrigger", direction: "in", name: "event", topic: "topic", brokerList: "%BrokerList%", username: "%ConfluentCloudUserName%", password: "%ConfluentCloudPassword%", consumerGroup: "$Default", protocol: "saslSsl", authenticationMode: "plain", dataType: "string", cardinality: "MANY" }, handler: kafkaTriggerMany, }); ``` # [Event Hubs](#tab/event-hubs/v4) :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript-v4/src/functions/kafkaTriggerMany.ts" range="1-39,41-57"::: # [Confluent](#tab/confluent/v3) :::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerMany/function.confluent.json" ::: This code parses the array of events and logs the event data: :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerMany/index.ts" ::: This code logs the header data: :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerManyWithHeaders/index.ts" ::: # [Event Hubs](#tab/event-hubs/v3) :::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerMany/function.eventhub.json" ::: This code parses the array of events and logs the event data: :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerMany/index.ts" ::: This code logs the header data: :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerManyWithHeaders/index.ts" ::: --- You can define a generic [Avro schema] for the event passed to the trigger. This example defines the trigger for the specific provider with a generic Avro schema: # [Confluent](#tab/confluent/v4) :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript-v4/src/functions/kafkaAvroGenericTrigger.ts" range="1-15,17-34"::: # [Event Hubs](#tab/event-hubs/v4) ```typescript import { app, InvocationContext } from "@azure/functions"; export async function kafkaAvroGenericTrigger( event: any, context: InvocationContext ): Promise<void> { context.log("Processed kafka event: ", event); context.log( `Message ID: ${event.id}, amount: ${event.amount}, type: ${event.type}` ); if (context.triggerMetadata?.key !== undefined) { context.log(`Message Key : ${context.triggerMetadata?.key}`); } } app.generic("kafkaAvroGenericTrigger", { trigger: { type: "kafkaTrigger", direction: "in", name: "event", protocol: "SASLSSL", password: "EventHubConnectionString", dataType: "string", topic: "topic", authenticationMode: "PLAIN", avroSchema: '{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}', consumerGroup: "$Default", username: "$ConnectionString", brokerList: "%BrokerList%", }, handler: kafkaAvroGenericTrigger, }); ``` # [Confluent](#tab/confluent/v3) :::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerAvroGeneric/function.confluent.json" ::: The following code runs when the function is triggered: :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerAvroGeneric/index.ts" ::: # [Event Hubs](#tab/event-hubs/v3) :::code language="json" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerAvroGeneric/function.eventhub.json" ::: The following code runs when the function is triggered: :::code language="typescript" source="~/azure-functions-kafka-extension/samples/typescript/KafkaTriggerAvroGeneric/index.ts" ::: --- # [Version 4](#tab/v4) For a complete set of working TypeScript examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/tree/dev/samples/typescript-v4/src/functions). # [Version 3](#tab/v3) For a complete set of working TypeScript examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/typescript/). --- ::: zone-end ::: zone pivot="programming-language-powershell" The specific properties of the `function.json` file depend on your event provider. In these examples, the event providers are either Confluent or Azure Event Hubs. The following examples show a Kafka trigger for a function that reads and logs a Kafka message. The following `function.json` file defines the trigger for the specific provider: # [Confluent](#tab/confluent) :::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTrigger/function.confluent.json" ::: # [Event Hubs](#tab/event-hubs) :::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTrigger/function.eventhub.json" ::: --- The following code runs when the function is triggered: :::code language="powershell" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTrigger/run.ps1" ::: To receive events in a batch, set the `cardinality` value to `many` in the function.json file, as shown in the following examples: # [Confluent](#tab/confluent) :::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerMany/function.confluent.json" ::: # [Event Hubs](#tab/event-hubs) :::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerMany/function.eventhub.json" ::: --- The following code parses the array of events and logs the event data: :::code language="powershell" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerMany/run.ps1" ::: The following code logs the header data: :::code language="powershell" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerManyWithHeaders/run.ps1" ::: You can define a generic [Avro schema] for the event passed to the trigger. The following function.json defines the trigger for the specific provider with a generic Avro schema: # [Confluent](#tab/confluent) :::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerAvroGeneric/function.confluent.json" ::: # [Event Hubs](#tab/event-hubs) :::code language="json" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerAvroGeneric/function.eventhub.json" ::: --- The following code runs when the function is triggered: :::code language="powershell" source="~/azure-functions-kafka-extension/samples/powershell/KafkaTriggerAvroGeneric/run.ps1" ::: For a complete set of working PowerShell examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/powershell/). ::: zone-end ::: zone pivot="programming-language-python" The usage of the trigger depends on your version of the Python programming model. # [Version 2](#tab/v2) In the Python v2 model, you define your trigger directly in your function code using decorators. For more information, see the [Azure Functions Python developer guide](functions-reference-python.md?pivots=python-mode-decorators). # [Version 1](#tab/v1) In the Python v1 model, you define your trigger in the `function.json` with your function code. For more information, see the [Azure Functions Python developer guide](functions-reference-python.md?pivots=python-mode-configuration). --- These examples show how to define a Kafka trigger for a function that reads a Kafka message. # [Version 2](#tab/v2) :::code language="python" source="~/azure-functions-kafka-extension/samples/python-v2/kafka_trigger.py" range="10-22" ::: # [Version 1](#tab/v1) This `function.json` file defines the trigger: :::code language="json" source="~/azure-functions-kafka-extension/samples/python/KafkaTrigger/function.confluent.json" ::: This code runs when the function is triggered: :::code language="python" source="~/azure-functions-kafka-extension/samples/python/KafkaTrigger/main.py" ::: --- This example receives events in a batch by setting the `cardinality` value to `many`. # [Version 2](#tab/v2) :::code language="python" source="~/azure-functions-kafka-extension/samples/python-v2/kafka_trigger.py" range="24-38" ::: # [Version 1](#tab/v1) :::code language="json" source="~/azure-functions-kafka-extension/samples/python/KafkaTriggerMany/function.confluent.json" ::: This code parses the array of events and logs the event data: :::code language="python" source="~/azure-functions-kafka-extension/samples/python/KafkaTriggerMany/main.py" ::: This code logs the header data: :::code language="python" source="~/azure-functions-kafka-extension/samples/python/KafkaTriggerManyWithHeaders/__init__.py" ::: --- You can define a generic [Avro schema] for the event passed to the trigger. # [Version 2](#tab/v2) :::code language="python" source="~/azure-functions-kafka-extension/samples/python-v2/kafka_trigger_avro.py" range="24-37" ::: # [Version 1](#tab/v1) This `function.json` defines the trigger with a generic Avro schema: :::code language="json" source="~/azure-functions-kafka-extension/samples/python/KafkaTriggerAvroGeneric/function.confluent.json" ::: This code runs when the function is triggered: :::code language="python" source="~/azure-functions-kafka-extension/samples/python/KafkaTriggerAvroGeneric/main.py" ::: --- # [Version 2](#tab/v2) For a complete set of working Python examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/python-v2/). # [Version 1](#tab/v1) For a complete set of working Python examples, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/python/). --- ::: zone-end ::: zone pivot="programming-language-java" The annotations you use to configure your trigger depend on the specific event provider. # [Confluent](#tab/confluent) The following example shows a Java function that reads and logs the content of the Kafka event: :::code language="java" source="~/azure-functions-kafka-extension/samples/java/confluent/src/main/java/com/contoso/kafka/SampleKafkaTrigger.java" range="19-35" ::: To receive events in a batch, use an input string as an array, as shown in the following example: :::code language="java" source="~/azure-functions-kafka-extension/samples/java/confluent/src/main/java/com/contoso/kafka/KafkaTriggerMany.java" range="8-27" ::: The following function logs the message and headers for the Kafka Event: :::code language="java" source="~/azure-functions-kafka-extension/samples/java/confluent/src/main/java/com/contoso/kafka/KafkaTriggerManyWithHeaders.java" range="12-38" ::: You can define a generic [Avro schema] for the event passed to the trigger. The following function defines a trigger for the specific provider with a generic Avro schema: :::code language="java" source="~/azure-functions-kafka-extension/samples/java/confluent/src/main/java/com/contoso/kafka/avro/generic/KafkaTriggerAvroGeneric.java" range="15-31" ::: For a complete set of working Java examples for Confluent, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/tree/dev/samples/java/confluent/src/main/java/com/contoso/kafka). # [Event Hubs](#tab/event-hubs) The following example shows a Java function that reads and logs the content of the Kafka event: :::code language="java" source="~/azure-functions-kafka-extension/samples/java/eventhub/src/main/java/com/contoso/kafka/SampleKafkaTrigger.java" range="19-35" ::: To receive events in a batch, use an input string as an array, as shown in the following example: :::code language="java" source="~/azure-functions-kafka-extension/samples/java/eventhub/src/main/java/com/contoso/kafka/KafkaTriggerMany.java" range="8-27" ::: The following function logs the message and headers for the Kafka Event: :::code language="java" source="~/azure-functions-kafka-extension/samples/java/eventhub/src/main/java/com/contoso/kafka/KafkaTriggerManyWithHeaders.java" range="12-38" ::: You can define a generic [Avro schema] for the event passed to the trigger. The following function defines a trigger for the specific provider with a generic Avro schema: :::code language="java" source="~/azure-functions-kafka-extension/samples/java/eventhub/src/main/java/com/contoso/kafka/avro/generic/KafkaTriggerAvroGeneric.java" range="15-31" ::: For a complete set of working Java examples for Event Hubs, see the [Kafka extension repository](https://github.com/Azure/azure-functions-kafka-extension/tree/dev/samples/java/confluent/src/main/java/com/contoso/kafka). --- ::: zone-end ::: zone pivot="programming-language-csharp" ## Attributes Both [in-process](functions-dotnet-class-library.md) and [isolated worker process](dotnet-isolated-process-guide.md) C# libraries use the `KafkaTriggerAttribute` to define the function trigger. The following table explains the properties you can set by using this trigger attribute: | Parameter |Description| | --- | --- | | **BrokerList** | (Required) The list of Kafka brokers monitored by the trigger. See [Connections](#connections) for more information. | | **Topic** | (Required) The topic monitored by the trigger. | | **ConsumerGroup** | (Optional) Kafka consumer group used by the trigger. | | **AvroSchema** | (Optional) Schema of a generic record of message value when using the Avro protocol. | | **KeyAvroSchema** | (Optional) Schema of a generic record of message key when using the Avro protocol. | | **KeyDataType** | (Optional) Data type to receive the message key as from Kafka Topic. If `KeyAvroSchema` is set, this value is generic record. Accepted values are `Int`, `Long`, `String`, and `Binary`. | | **AuthenticationMode** | (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are `NotSet` (default), `Gssapi`, `Plain`, `ScramSha256`, `ScramSha512`, and `OAuthBearer`. | | **Username** | (Optional) The username for SASL authentication. Not supported when `AuthenticationMode` is `Gssapi`. See [Connections](#connections) for more information.| | **Password** | (Optional) The password for SASL authentication. Not supported when `AuthenticationMode` is `Gssapi`. See [Connections](#connections) for more information.| | **Protocol** | (Optional) The security protocol used when communicating with brokers. The supported values are `NotSet` (default), `plaintext`, `ssl`, `sasl_plaintext`, `sasl_ssl`. | | **SslCaLocation** | (Optional) Path to CA certificate file for verifying the broker's certificate. | | **SslCertificateLocation** | (Optional) Path to the client's certificate. | | **SslKeyLocation** | (Optional) Path to client's private key (PEM) used for authentication. | | **SslKeyPassword** | (Optional) Password for client's certificate. | | **SslCertificatePEM** | (Optional) Client certificate in PEM format as a string. See [Connections](#connections) for more information. | | **SslKeyPEM** | (Optional) Client private key in PEM format as a string. See [Connections](#connections) for more information. | | **SslCaPEM** | (Optional) CA certificate in PEM format as a string. See [Connections](#connections) for more information. | | **SslCertificateandKeyPEM** | (Optional) Client certificate and key in PEM format as a string. See [Connections](#connections) for more information. | | **SchemaRegistryUrl** | (Optional) URL for the Avro Schema Registry. See [Connections](#connections) for more information. | | **SchemaRegistryUsername** | (Optional) Username for the Avro Schema Registry. See [Connections](#connections) for more information. | | **SchemaRegistryPassword** | (Optional) Password for the Avro Schema Registry. See [Connections](#connections) for more information. | | **OAuthBearerMethod** | (Optional) OAuth Bearer method. Accepted values are `oidc` and `default`. | | **OAuthBearerClientId** | (Optional) When `OAuthBearerMethod` is set to `oidc`, this specifies the OAuth bearer client ID. See [Connections](#connections) for more information. | | **OAuthBearerClientSecret** | (Optional) When `OAuthBearerMethod` is set to `oidc`, this specifies the OAuth bearer client secret. See [Connections](#connections) for more information. | | **OAuthBearerScope** | (Optional) Specifies the scope of the access request to the broker. | | **OAuthBearerTokenEndpointUrl** | (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when `oidc` method is used. See [Connections](#connections) for more information. | | **OAuthBearerExtensions** | (Optional) Comma-separated list of key=value pairs to be provided as additional information to broker when `oidc` method is used. For example: `supportFeatureX=true,organizationId=sales-emea`. | ::: zone-end ::: zone pivot="programming-language-java" ## Annotations The `KafkaTrigger` annotation enables you to create a function that runs when it receives a topic. Supported options include the following elements: | Element | Description | |---------|----------------------| | **name** | (Required) The name of the variable that represents the queue or topic message in function code. | | **brokerList** | (Required) The list of Kafka brokers monitored by the trigger. See [Connections](#connections) for more information. | | **topic** | (Required) The topic monitored by the trigger. | | **cardinality** | (Optional) Indicates the cardinality of the trigger input. The supported values are `ONE` (default) and `MANY`. Use `ONE` when the input is a single message and `MANY` when the input is an array of messages. When you use `MANY`, you must also set a `dataType`. | | **dataType** | Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When `string`, the input is treated as just a string. When `binary`, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[]. | | **consumerGroup** | (Optional) Kafka consumer group used by the trigger. | | **avroSchema** | (Optional) Schema of a generic record when using the Avro protocol. | | **authenticationMode** | (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are `NotSet` (default), `Gssapi`, `Plain`, `ScramSha256`, `ScramSha512`. | | **username** | (Optional) The username for SASL authentication. Not supported when `AuthenticationMode` is `Gssapi`. See [Connections](#connections) for more information.| | **password** | (Optional) The password for SASL authentication. Not supported when `AuthenticationMode` is `Gssapi`. See [Connections](#connections) for more information.| | **protocol** | (Optional) The security protocol used when communicating with brokers. The supported values are `NotSet` (default), `plaintext`, `ssl`, `sasl_plaintext`, `sasl_ssl`. | | **sslCaLocation** | (Optional) Path to CA certificate file for verifying the broker's certificate. | | **sslCertificateLocation** | (Optional) Path to the client's certificate. | | **sslKeyLocation** | (Optional) Path to client's private key (PEM) used for authentication. | | **sslKeyPassword** | (Optional) Password for client's certificate. | | **lagThreshold** | (Optional) Lag threshold for the trigger. | | **schemaRegistryUrl** | (Optional) URL for the Avro Schema Registry. See [Connections](#connections) for more information. | | **schemaRegistryUsername** | (Optional) Username for the Avro Schema Registry. See [Connections](#connections) for more information. | | **schemaRegistryPassword** | (Optional) Password for the Avro Schema Registry. See [Connections](#connections) for more information. | ::: zone-end ::: zone pivot="programming-language-javascript,programming-language-powershell" ## Configuration The following table explains the binding configuration properties that you set in the *function.json* file. | _function.json_ property |Description| | --- | --- | |**type** | (Required) Set to `kafkaTrigger`. | |**direction** | (Required) Set to `in`. | |**name** | (Required) The name of the variable that represents the brokered data in function code. | | **brokerList** | (Required) The list of Kafka brokers monitored by the trigger. See [Connections](#connections) for more information.| | **topic** | (Required) The topic monitored by the trigger. | | **cardinality** | (Optional) Indicates the cardinality of the trigger input. The supported values are `ONE` (default) and `MANY`. Use `ONE` when the input is a single message and `MANY` when the input is an array of messages. When you use `MANY`, you must also set a `dataType`. | | **dataType** | Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When `string`, the input is treated as just a string. When `binary`, the message is received as binary data, and Functions tries to deserialize it to an actual byte array parameter type. | | **consumerGroup** | (Optional) Kafka consumer group used by the trigger. | | **avroSchema** | (Optional) Schema of a generic record when using the Avro protocol. | | **keyAvroSchema** | (Optional) Schema of a generic record of message key when using the Avro protocol. | | **keyDataType** | (Optional) Data type to receive the message key as from Kafka Topic. If `keyAvroSchema` is set, this value is generic record. Accepted values are `Int`, `Long`, `String`, and `Binary`. | | **authenticationMode** | (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are `NotSet` (default), `Gssapi`, `Plain`, `ScramSha256`, `ScramSha512`. | | **username** | (Optional) The username for SASL authentication. Not supported when `AuthenticationMode` is `Gssapi`. See [Connections](#connections) for more information. | | **password** | (Optional) The password for SASL authentication. Not supported when `AuthenticationMode` is `Gssapi`. See [Connections](#connections) for more information.| | **protocol** | (Optional) The security protocol used when communicating with brokers. The supported values are `NotSet` (default), `plaintext`, `ssl`, `sasl_plaintext`, `sasl_ssl`. | | **sslCaLocation** | (Optional) Path to CA certificate file for verifying the broker's certificate. | | **sslCertificateLocation** | (Optional) Path to the client's certificate. | | **sslKeyLocation** | (Optional) Path to client's private key (PEM) used for authentication. | | **sslKeyPassword** | (Optional) Password for client's certificate. | | **sslCertificatePEM** | (Optional) Client certificate in PEM format as a string. See [Connections](#connections) for more information. | | **sslKeyPEM** | (Optional) Client private key in PEM format as a string. See [Connections](#connections) for more information. | | **sslCaPEM** | (Optional) CA certificate in PEM format as a string. See [Connections](#connections) for more information. | | **sslCertificateandKeyPEM** | (Optional) Client certificate and key in PEM format as a string. See [Connections](#connections) for more information. | | **lagThreshold** | (Optional) Lag threshold for the trigger. | | **schemaRegistryUrl** | (Optional) URL for the Avro Schema Registry. See [Connections](#connections) for more information. | | **schemaRegistryUsername** | (Optional) Username for the Avro Schema Registry. See [Connections](#connections) for more information. | | **schemaRegistryPassword** | (Optional) Password for the Avro Schema Registry. See [Connections](#connections) for more information. | | **oAuthBearerMethod** | (Optional) OAuth Bearer method. Accepted values are `oidc` and `default`. | | **oAuthBearerClientId** | (Optional) When `oAuthBearerMethod` is set to `oidc`, this specifies the OAuth bearer client ID. See [Connections](#connections) for more information. | | **oAuthBearerClientSecret** | (Optional) When `oAuthBearerMethod` is set to `oidc`, this specifies the OAuth bearer client secret. See [Connections](#connections) for more information. | | **oAuthBearerScope** | (Optional) Specifies the scope of the access request to the broker. | | **oAuthBearerTokenEndpointUrl** | (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when `oidc` method is used. See [Connections](#connections) for more information. | ::: zone-end ::: zone pivot="programming-language-python" ## Configuration The following table explains the binding configuration properties that you set in the *function.json* file. Python uses snake_case naming conventions for configuration properties. | _function.json_ property |Description| | --- | --- | |**type** | (Required) Set to `kafkaTrigger`. | |**direction** | (Required) Set to `in`. | |**name** | (Required) The name of the variable that represents the brokered data in function code. | | **broker_list** | (Required) The list of Kafka brokers monitored by the trigger. See [Connections](#connections) for more information.| | **topic** | (Required) The topic monitored by the trigger. | | **cardinality** | (Optional) Indicates the cardinality of the trigger input. The supported values are `ONE` (default) and `MANY`. Use `ONE` when the input is a single message and `MANY` when the input is an array of messages. When you use `MANY`, you must also set a `data_type`. | | **data_type** | Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When `string`, the input is treated as just a string. When `binary`, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[]. | | **consumerGroup** | (Optional) Kafka consumer group used by the trigger. | | **avroSchema** | (Optional) Schema of a generic record when using the Avro protocol. | | **authentication_mode** | (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are `NOTSET` (default), `Gssapi`, `Plain`, `ScramSha256`, `ScramSha512`. | | **username** | (Optional) The username for SASL authentication. Not supported when `authentication_mode` is `Gssapi`. See [Connections](#connections) for more information. | | **password** | (Optional) The password for SASL authentication. Not supported when `authentication_mode` is `Gssapi`. See [Connections](#connections) for more information.| | **protocol** | (Optional) The security protocol used when communicating with brokers. The supported values are `NOTSET` (default), `plaintext`, `ssl`, `sasl_plaintext`, `sasl_ssl`. | | **sslCaLocation** | (Optional) Path to CA certificate file for verifying the broker's certificate. | | **sslCertificateLocation** | (Optional) Path to the client's certificate. | | **sslKeyLocation** | (Optional) Path to client's private key (PEM) used for authentication. | | **sslKeyPassword** | (Optional) Password for client's certificate. | | **lag_threshold** | (Optional) Lag threshold for the trigger. | | **schema_registry_url** | (Optional) URL for the Avro Schema Registry. See [Connections](#connections) for more information. | | **schema_registry_username** | (Optional) Username for the Avro Schema Registry. See [Connections](#connections) for more information. | | **schema_registry_password** | (Optional) Password for the Avro Schema Registry. See [Connections](#connections) for more information. | | **o_auth_bearer_method** | (Optional) OAuth Bearer method. Accepted values are `oidc` and `default`. | | **o_auth_bearer_client_id** | (Optional) When `o_auth_bearer_method` is set to `oidc`, this specifies the OAuth bearer client ID. See [Connections](#connections) for more information. | | **o_auth_bearer_client_secret** | (Optional) When `o_auth_bearer_method` is set to `oidc`, this specifies the OAuth bearer client secret. See [Connections](#connections) for more information. | | **o_auth_bearer_scope** | (Optional) Specifies the scope of the access request to the broker. | | **o_auth_bearer_token_endpoint_url** | (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when `oidc` method is used. See [Connections](#connections) for more information. | > [!NOTE] > Certificate PEM-related properties and Avro key-related properties aren't yet available in the Python library. ::: zone-end ## Usage ::: zone pivot="programming-language-csharp" # [Isolated worker model](#tab/isolated-process) The Kafka trigger currently supports Kafka events as strings and string arrays that are JSON payloads. # [In-process model](#tab/in-process) The Kafka trigger passes Kafka events to the function as `KafkaEventData<string>` objects or arrays. The trigger also supports strings and string arrays that are JSON payloads. --- ::: zone-end ::: zone pivot="programming-language-javascript,programming-language-python,programming-language-powershell" The Kafka trigger passes Kafka messages to the function as strings. The trigger also supports string arrays that are JSON payloads. ::: zone-end In a Premium plan, you must enable runtime scale monitoring for the Kafka output to scale out to multiple instances. To learn more, see [Enable runtime scaling](functions-bindings-kafka.md#enable-runtime-scaling). You can't use the **Test/Run** feature of the **Code + Test** page in the Azure portal to work with Kafka triggers. You must instead send test events directly to the topic being monitored by the trigger. For a complete set of supported host.json settings for the Kafka trigger, see [host.json settings](functions-bindings-kafka.md#hostjson-settings). [!INCLUDE [functions-bindings-kafka-connections](../../includes/functions-bindings-kafka-connections.md)] ## Next steps - [Write to an Apache Kafka stream from a function](./functions-bindings-kafka-output.md) [Avro schema]: http://avro.apache.org/docs/current/
Success! Branch created successfully. Create Pull Request on GitHub
Error: