Proposed Pull Request Change

title description ms.topic ms.date zone_pivot_groups ms.custom
Quickstart: Create an app with Durable Task SDKs and Durable Task Scheduler Learn how to configure an existing app for the Durable Task Scheduler using the Durable Task SDKs. how-to 05/06/2025 df-languages ['build-2025']
📄 Document Links
GitHub View on GitHub Microsoft Learn View on Microsoft Learn
Raw New Markdown
Generating updated version of doc...
Rendered New Markdown
Generating updated version of doc...
+0 -0
+0 -0
--- title: "Quickstart: Create an app with Durable Task SDKs and Durable Task Scheduler" description: Learn how to configure an existing app for the Durable Task Scheduler using the Durable Task SDKs. ms.topic: how-to ms.date: 05/06/2025 zone_pivot_groups: df-languages ms.custom: - build-2025 --- # Quickstart: Create an app with Durable Task SDKs and Durable Task Scheduler The Durable Task SDKs provide a lightweight client library for the Durable Task Scheduler. In this quickstart, you learn how to create orchestrations that use [the fan-out/fan-in application pattern](../durable-functions-overview.md#fan-in-out) to perform parallel processing. ::: zone pivot="javascript" [!INCLUDE [preview-sample-limitations](./includes/preview-sample-limitations.md)] ::: zone-end ::: zone pivot="powershell" [!INCLUDE [preview-sample-limitations](./includes/preview-sample-limitations.md)] ::: zone-end ::: zone pivot="csharp,python,java" > [!div class="checklist"] > > - Set up and run the Durable Task Scheduler emulator for local development. > - Run the worker and client projects. > - Review orchestration status and history via the Durable Task Scheduler dashboard. ## Prerequisites Before you begin: ::: zone-end ::: zone pivot="csharp" - Make sure you have [.NET 8 SDK](https://dotnet.microsoft.com/download/dotnet/8.0) or later. - Install [Docker](https://www.docker.com/products/docker-desktop/) for running the emulator. - Clone the [Durable Task Scheduler GitHub repository](https://github.com/Azure-Samples/Durable-Task-Scheduler) to use the quickstart sample. ::: zone-end ::: zone pivot="python" - Make sure you have [Python 3.9+](https://www.python.org/downloads/) or later. - Install [Docker](https://www.docker.com/products/docker-desktop/) for running the emulator. - Clone the [Durable Task Scheduler GitHub repository](https://github.com/Azure-Samples/Durable-Task-Scheduler) to use the quickstart sample. ::: zone-end ::: zone pivot="java" - Make sure you have [Java 8 or 11](https://www.java.com/en/download/). - Install [Docker](https://www.docker.com/products/docker-desktop/) for running the emulator. - Clone the [Durable Task Scheduler GitHub repository](https://github.com/Azure-Samples/Durable-Task-Scheduler) to use the quickstart sample. ::: zone-end ::: zone pivot="csharp,python,java" ## Set up the Durable Task Scheduler emulator The application code looks for a deployed scheduler and task hub resource. If none is found, the code falls back to the emulator. The emulator simulates a scheduler and task hub in a Docker container, making it ideal for the local development required in this quickstart. ::: zone-end ::: zone pivot="csharp" 1. From the `Azure-Samples/Durable-Task-Scheduler` root directory, navigate to the .NET SDK sample directory. ```bash cd samples/durable-task-sdks/dotnet/FanOutFanIn ``` 1. Pull the Docker image for the emulator. ```bash docker pull mcr.microsoft.com/dts/dts-emulator:latest ``` 1. Run the emulator. The container may take a few seconds to be ready. ```bash docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest ``` Since the example code automatically uses the default emulator settings, you don't need to set any environment variables. The default emulator settings for this quickstart are: - Endpoint: `http://localhost:8080` - Task hub: `default` ::: zone-end ::: zone pivot="python" 1. From the `Azure-Samples/Durable-Task-Scheduler` root directory, navigate to the Python SDK sample directory. ```bash cd samples/durable-task-sdks/python/fan-out-fan-in ``` 1. Pull the Docker image for the emulator. ```bash docker pull mcr.microsoft.com/dts/dts-emulator:latest ``` 1. Run the emulator. The container may take a few seconds to be ready. ```bash docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest ``` Since the example code automatically uses the default emulator settings, you don't need to set any environment variables. The default emulator settings for this quickstart are: - Endpoint: `http://localhost:8080` - Task hub: `default` ::: zone-end ::: zone pivot="java" 1. From the `Azure-Samples/Durable-Task-Scheduler` root directory, navigate to the Java SDK sample directory. ```bash cd samples/durable-task-sdks/java/fan-out-fan-in ``` 1. Pull the Docker image for the emulator. ```bash docker pull mcr.microsoft.com/dts/dts-emulator:latest ``` 1. Run the emulator. The container may take a few seconds to be ready. ```bash docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest ``` Since the example code automatically uses the default emulator settings, you don't need to set any environment variables. The default emulator settings for this quickstart are: - Endpoint: `http://localhost:8080` - Task hub: `default` ::: zone-end ::: zone pivot="csharp,python,java" ## Run the quickstart ::: zone-end ::: zone pivot="csharp" 1. From the `FanOutFanIn` directory, navigate to the `Worker` directory to build and run the worker. ```bash cd Worker dotnet build dotnet run ``` 1. In a separate terminal, from the `FanOutFanIn` directory, navigate to the `Client` directory to build and run the client. ```bash cd Client dotnet build dotnet run ``` ### Understanding the output When you run this sample, you receive output from both the worker and client processes. [Unpack what happened in the code when you ran the project.](#understanding-the-code-structure) #### Worker output The worker output shows: - Registration of the orchestrator and activities - Log entries when each activity is called - Parallel processing of multiple work items - Final aggregation of results #### Client output The client output shows: - The orchestration starting with a list of work items - The unique orchestration instance ID - The final aggregated results, showing each work item and its corresponding result - Total count of processed items #### Example output ``` Starting Fan-Out Fan-In Pattern - Parallel Processing Client Using local emulator with no authentication Starting parallel processing orchestration with 5 work items Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"] Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b Waiting for orchestration to complete... Orchestration completed with status: Completed Processing results: Work item: Task1, Result: 5 Work item: Task2, Result: 5 Work item: Task3, Result: 5 Work item: LongerTask4, Result: 11 Work item: VeryLongTask5, Result: 13 Total items processed: 5 ``` ::: zone-end ::: zone pivot="python" 1. Activate a Python virtual environment. # [Windows](#tab/windows) ```bash python -m venv venv /venv/Scripts/activate ``` # [Linux](#tab/linux) ```bash python -m venv venv source ./venv/bin/activate ``` --- 1. Install the required packages. ```bash pip install -r requirements.txt ``` 1. Start the worker. ```bash python worker.py ``` **Expected output** You can see output indicating that the worker started and is waiting for work items. ``` Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items... ``` 1. In a new terminal, activate the virtual environment and run the client. # [Windows](#tab/windows) ```bash venv/Scripts/activate python client.py ``` # [Linux](#tab/linux) ```bash source ./venv/bin/activate python client.py ``` --- You can provide the number of work items as an argument. If no argument is provided, the example runs 10 items by default. ```bash python client.py [number_of_items] ``` ### Understanding the output When you run this sample, you receive output from both the worker and client processes. [Unpack what happened in the code when you ran the project.](#understanding-the-code-structure) #### Worker output The worker output shows: - Registration of the orchestrator and activities. - Status messages when processing each work item in parallel, showing that they're executing concurrently. - Random delays for each work item (between 0.5 and 2 seconds) to simulate varying processing times. - A final message showing the aggregation of results. #### Client output The client output shows: - The orchestration starting with the specified number of work items. - The unique orchestration instance ID. - The final aggregated result, which includes: - The total number of items processed - The sum of all results (each item result is the square of its value) - The average of all results #### Example output ``` Starting fan out/fan in orchestration with 10 items Waiting for 10 parallel tasks to complete Orchestrator yielded with 10 task(s) and 0 event(s) outstanding. Processing work item: 1 Processing work item: 2 Processing work item: 10 Processing work item: 9 Processing work item: 8 Processing work item: 7 Processing work item: 6 Processing work item: 5 Processing work item: 4 Processing work item: 3 Orchestrator yielded with 9 task(s) and 0 event(s) outstanding. Orchestrator yielded with 8 task(s) and 0 event(s) outstanding. Orchestrator yielded with 7 task(s) and 0 event(s) outstanding. Orchestrator yielded with 6 task(s) and 0 event(s) outstanding. Orchestrator yielded with 5 task(s) and 0 event(s) outstanding. Orchestrator yielded with 4 task(s) and 0 event(s) outstanding. Orchestrator yielded with 3 task(s) and 0 event(s) outstanding. Orchestrator yielded with 2 task(s) and 0 event(s) outstanding. Orchestrator yielded with 1 task(s) and 0 event(s) outstanding. All parallel tasks completed, aggregating results Orchestrator yielded with 1 task(s) and 0 event(s) outstanding. Aggregating results from 10 items Orchestration completed with status: COMPLETED ``` ::: zone-end ::: zone pivot="java" From the `fan-out-fan-in` directory, build and run the application using Gradle. ```bash ./gradlew runFanOutFanInPattern ``` > [!TIP] > If you receive the error message `zsh: permission denied: ./gradlew`, try running `chmod +x gradlew` before running the application. ### Understanding the output When you run this sample, you receive output that shows: - Registration of the orchestrator and activities. - Status messages when processing each work item in parallel, showing that they're executing concurrently. - Random delays for each work item (between 0.5 and 2 seconds) to simulate varying processing times. - A final message showing the aggregation of results. [Unpack what happened in the code when you ran the project.](#understanding-the-code-structure) #### Example output ``` Starting a Gradle Daemon (subsequent builds will be faster) > Task :runFanOutFanInPattern Durable Task worker is connecting to sidecar at localhost:8080. Started new orchestration instance Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60'] Output: 60 ``` ::: zone-end ::: zone pivot="csharp,python,java" Now that you ran the project locally, you can now learn how to [deploy to Azure hosted in Azure Container Apps](#next-steps). ## View orchestration status and history You can view the orchestration status and history via the [Durable Task Scheduler dashboard](./durable-task-scheduler-dashboard.md). By default, the dashboard runs on port 8082. 1. Navigate to http://localhost:8082 in your web browser. 1. Click the **default** task hub. The orchestration instance you created is in the list. 1. Click the orchestration instance ID to view the execution details, which include: - The parallel execution of multiple activity tasks - The fan-in aggregation step - The input and output at each step - The time taken for each step ::: zone-end ::: zone pivot="csharp" :::image type="content" source="./media/quickstart-portable-durable-task-sdks/review-dashboard-dotnet.png" alt-text="Screenshot showing the orchestration instance's details for the .NET sample."::: ::: zone-end ::: zone pivot="python" :::image type="content" source="./media/quickstart-portable-durable-task-sdks/review-dashboard-python.png" alt-text="Screenshot showing the orchestration instance's details for the Python sample."::: ::: zone-end ::: zone pivot="java" :::image type="content" source="./media/quickstart-portable-durable-task-sdks/review-dashboard-java.png" alt-text="Screenshot showing the orchestration instance's details for the Java sample."::: ::: zone-end ::: zone pivot="csharp,python,java" ## Understanding the code structure ::: zone-end ::: zone pivot="csharp" ### The worker project To demonstrate [the fan-out/fan-in pattern](../durable-functions-overview.md#fan-in-out), the worker project orchestration creates parallel activity tasks and waits for all to complete. The orchestrator: 1. Takes a list of work items as input. 1. Fans out by creating a separate task for each work item using `ProcessWorkItemActivity`. 1. Executes all tasks in parallel. 1. Waits for all tasks to complete using `Task.WhenAll`. 1. Fans in by aggregating all individual results using `AggregateResultsActivity`. 1. Returns the final aggregated result to the client. The worker project contains: - **ParallelProcessingOrchestration.cs**: Defines the orchestrator and activity functions in a single file. - **Program.cs**: Sets up the worker host with proper connection string handling. #### `ParallelProcessingOrchestration.cs` Using fan-out/fan-in, the orchestration creates parallel activity tasks and waits for all to complete. ```csharp public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems) { // Step 1: Fan-out by creating a task for each work item in parallel List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>(); foreach (string workItem in workItems) { // Create a task for each work item (fan-out) Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>( nameof(ProcessWorkItemActivity), workItem); processingTasks.Add(task); } // Step 2: Wait for all parallel tasks to complete Dictionary<string, int>[] results = await Task.WhenAll(processingTasks); // Step 3: Fan-in by aggregating all results Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>( nameof(AggregateResultsActivity), results); return aggregatedResults; } ``` Each activity is implemented as a separate class decorated with the `[DurableTask]` attribute. ```csharp [DurableTask] public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>> { // Implementation processes a single work item } [DurableTask] public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>> { // Implementation aggregates individual results } ``` #### `Program.cs` The worker uses `Microsoft.Extensions.Hosting` for proper lifecycle management. ```csharp using Microsoft.Extensions.Hosting; //.. builder.Services.AddDurableTaskWorker() .AddTasks(registry => { registry.AddOrchestrator<ParallelProcessingOrchestration>(); registry.AddActivity<ProcessWorkItemActivity>(); registry.AddActivity<AggregateResultsActivity>(); }) .UseDurableTaskScheduler(connectionString); ``` ### The client project The client project: - Uses the same connection string logic as the worker. - Creates a list of work items to be processed in parallel. - Schedules an orchestration instance with the list as input. - Waits for the orchestration to complete and displays the aggregated results. - Uses `WaitForInstanceCompletionAsync` for efficient polling. ```csharp List<string> workItems = new List<string> { "Task1", "Task2", "Task3", "LongerTask4", "VeryLongTask5" }; // Schedule the orchestration with the work items string instanceId = await client.ScheduleNewOrchestrationInstanceAsync( "ParallelProcessingOrchestration", workItems); // Wait for completion var instance = await client.WaitForInstanceCompletionAsync( instanceId, getInputsAndOutputs: true, cts.Token); ``` ::: zone-end ::: zone pivot="python" ### `worker.py` To demonstrate [the fan-out/fan-in pattern](../durable-functions-overview.md#fan-in-out), the worker project orchestration creates parallel activity tasks and waits for all to complete. The orchestrator: 1. Receives a list of work items as input. 1. It "fans out" by creating parallel tasks for each work item (calling `process_work_item` for each one). 1. It waits for all tasks to complete using `task.when_all`. 1. It then "fans in" by aggregating the results with the `aggregate_results` activity. 1. The final aggregated result is returned to the client. Using fan-out/fan-in, the orchestration creates parallel activity tasks and waits for all to complete. ```csharp # Orchestrator function def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict: logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items") # Fan out: Create a task for each work item parallel_tasks = [] for item in work_items: parallel_tasks.append(ctx.call_activity("process_work_item", input=item)) # Wait for all tasks to complete logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete") results = yield task.when_all(parallel_tasks) # Fan in: Aggregate all the results logger.info("All parallel tasks completed, aggregating results") final_result = yield ctx.call_activity("aggregate_results", input=results) return final_result ``` ### `client.py` The client project: - Uses the same connection string logic as the worker. - Creates a list of work items to be processed in parallel. - Schedules an orchestration instance with the list as input. - Waits for the orchestration to complete and displays the aggregated results. - Uses `wait_for_orchestration_completion` for efficient polling. ```python # Generate work items (default 10 items if not specified) count = int(sys.argv[1]) if len(sys.argv) > 1 else 10 work_items = list(range(1, count + 1)) logger.info(f"Starting new fan out/fan in orchestration with {count} work items") # Schedule a new orchestration instance instance_id = client.schedule_new_orchestration( "fan_out_fan_in_orchestrator", input=work_items ) logger.info(f"Started orchestration with ID = {instance_id}") # Wait for orchestration to complete logger.info("Waiting for orchestration to complete...") result = client.wait_for_orchestration_completion( instance_id, timeout=60 ) ``` ::: zone-end ::: zone pivot="java" To demonstrate [the fan-out/fan-in pattern](../durable-functions-overview.md#fan-in-out), the `FanOutFanInPattern` project orchestration creates parallel activity tasks and waits for all to complete. The orchestrator: 1. Takes a list of work items as input. 1. Fans out by creating a separate task for each work item using ``. 1. Executes all tasks in parallel. 1. Waits for all tasks to complete using ``. 1. Fans in by aggregating all individual results using ``. 1. Returns the final aggregated result to the client. The project contains: - **`DurableTaskSchedulerWorkerExtensions` worker**: Defines the orchestrator and activity functions. - **`DurableTaskSchedulerClientExtension` client**: Sets up the worker host with proper connection string handling. ### Worker Using fan-out/fan-in, the orchestration creates parallel activity tasks and waits for all to complete. ```java DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString) .addOrchestration(new TaskOrchestrationFactory() { @Override public String getName() { return "FanOutFanIn_WordCount"; } @Override public TaskOrchestration create() { return ctx -> { List<?> inputs = ctx.getInput(List.class); List<Task<Integer>> tasks = inputs.stream() .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class)) .collect(Collectors.toList()); List<Integer> allWordCountResults = ctx.allOf(tasks).await(); int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum(); ctx.complete(totalWordCount); }; } }) .addActivity(new TaskActivityFactory() { @Override public String getName() { return "CountWords"; } @Override public TaskActivity create() { return ctx -> { String input = ctx.getInput(String.class); StringTokenizer tokenizer = new StringTokenizer(input); return tokenizer.countTokens(); }; } }) .build(); // Start the worker worker.start(); ``` ### Client The client project: - Uses the same connection string logic as the worker. - Creates a list of work items to be processed in parallel. - Schedules an orchestration instance with the list as input. - Waits for the orchestration to complete and displays the aggregated results. - Uses `waitForInstanceCompletion` for efficient polling. ```java DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build(); // The input is an arbitrary list of strings. List<String> listOfStrings = Arrays.asList( "Hello, world!", "The quick brown fox jumps over the lazy dog.", "If a tree falls in the forest and there is no one there to hear it, does it make a sound?", "The greatest glory in living lies not in never falling, but in rising every time we fall.", "Always remember that you are absolutely unique. Just like everyone else."); // Schedule an orchestration which will reliably count the number of words in all the given sentences. String instanceId = client.scheduleNewOrchestrationInstance( "FanOutFanIn_WordCount", new NewOrchestrationInstanceOptions().setInput(listOfStrings)); logger.info("Started new orchestration instance: {}", instanceId); // Block until the orchestration completes. Then print the final status, which includes the output. OrchestrationMetadata completedInstance = client.waitForInstanceCompletion( instanceId, Duration.ofSeconds(30), true); logger.info("Orchestration completed: {}", completedInstance); logger.info("Output: {}", completedInstance.readOutputAs(int.class)); ``` ::: zone-end ## Next steps Now that you've run the sample locally using the Durable Task Scheduler emulator, try creating a scheduler and task hub resource and deploying to Azure Container Apps. > [!div class="nextstepaction"] > [Deploy Durable Task Scheduler hosted on Azure Container Apps to Azure](./quickstart-container-apps-durable-task-sdk.md)
Success! Branch created successfully. Create Pull Request on GitHub
Error: