Using Durable Functions for Fan-Out/Fan-In Pattern

Introduction

In serverless computing, parallel processing is essential to handle large workloads efficiently. The fan-out/fan-in pattern represents a popular approach for parallel processing, where it splits a single operation into multiple sub-operations, executes them in parallel, and merges their results to produce the final output. Durable Functions in Azure provides a powerful way to implement the fan-out/fan-in pattern.

In this blog post, we will show you how to use Durable Functions to implement the fan-out/fan-in pattern using an example. We will walk you through the code for an orchestrator function that calls two activity functions in parallel and waits for their results to be returned before continuing.

Let’s consider an example where we have to perform the following tasks in parallel:

  1. Get all categories
  2. Save each category with a new ID and updated name
  3. Log and echo the updated categories

Demo

Here is the code for the Orchestrator Function that uses Durable Functions to implement the fan-out/fan-in pattern:

[FunctionName("OrchestratorFunction")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    // Call GetAllCategories function
    var categories = await context.CallActivityAsync<List<Category>>("GetAllCategoriesActivity", null);

    // Loop through each category and call SaveCategoryFunction
    var tasks = new List<Task>();
    var maxId = categories.Max(x => x.Id) + 1;
    foreach (var category in categories)
    {
        category.Id = maxId;
        category.Name = $"{category.Name}-{maxId}";
        tasks.Add(context.CallActivityAsync("SaveCategoryFunction", category));
        ++maxId;
    }

    // Wait for all SaveCategoryFunction calls to finish
    await Task.WhenAll(tasks);

    // Call LogAndEchoFunction passing the data from GetAllCategories
    await context.CallActivityAsync("LogAndEchoFunction", categories);
}

The code for the Orchestrator Function is straightforward. It first calls the GetAllCategoriesActivity function to get all categories. Then, it loops through each category and calls the SaveCategoryFunction in parallel. The Task.WhenAll method is used to wait for all the SaveCategoryFunction calls to complete. Finally, it calls the LogAndEchoFunction function, passing the data from GetAllCategories.

Here is the complete code.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace LearnSmartCoding_AZ204_Functions_Demo
{
    public static class OrchestratorFunction
    {
        private static readonly HttpClient _httpClient = new HttpClient();

        [FunctionName("OrchestratorFunction")]
        public static async Task RunOrchestrator(
            [OrchestrationTrigger] IDurableOrchestrationContext context)
        {
            // Call GetAllCategories function
            var categories = await context.CallActivityAsync<List<Category>>("GetAllCategoriesActivity", null);

            // Loop through each category and call SaveCategoryFunction
            var tasks = new List<Task>();
            var maxId = categories.Max(x => x.Id) + 1;
            foreach (var category in categories)
            {
                category.Id = maxId;
                category.Name = $"{category.Name}-{maxId}";
                tasks.Add(context.CallActivityAsync("SaveCategoryFunction", category));
                ++maxId;
            }

            // Wait for all SaveCategoryFunction calls to finish
            await Task.WhenAll(tasks);

            // Call LogAndEchoFunction passing the data from GetAllCategories
            await context.CallActivityAsync("LogAndEchoFunction", categories);
        }       

        [FunctionName("OrchestratorFunction_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
        {
            // Function input comes from the request content.
            string instanceId = await starter.StartNewAsync("OrchestratorFunction", null);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            // Call the OrchestratorFunction, passing in the instance ID
            var result = await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(req, instanceId, TimeSpan.FromSeconds(10));


            return result;
        }

        [FunctionName("SaveCategoryFunction")]
        public static async Task SaveCategoryFunction(
    [ActivityTrigger] IDurableActivityContext context,
    ILogger log)
        {
            log.LogInformation("SaveCategoryFunction processed a request.");

            Category category = context.GetInput<Category>();

            log.LogInformation($"Category name: {category.Name}");

            // Save the category to the specified URL
            using (var httpClient = new HttpClient())
            {
                var json = JsonConvert.SerializeObject(category);
                var content = new StringContent(json, Encoding.UTF8, "application/json");
                httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
                var response = await httpClient.PostAsync("https://essentialproducts-api.azurewebsites.net/api/Category", content);

                if (!response.IsSuccessStatusCode)
                {
                    log.LogError($"Error saving category. Status code: {response.StatusCode}");
                    throw new Exception("Error saving category.");
                }
            }
        }

        [FunctionName("DeleteCategory")]
        public static async Task<IActionResult> DeleteCategory(
    [ActivityTrigger] string id,
    ILogger log)
        {
            using (var httpClient = new HttpClient())
            {
                var apiUrl = $"https://essentialproducts-api.azurewebsites.net/api/Category/{id}";
                var response = await httpClient.DeleteAsync(apiUrl);
                if (response.StatusCode == HttpStatusCode.NotFound)
                {
                    return new NotFoundResult();
                }
                else if (!response.IsSuccessStatusCode)
                {
                    return new StatusCodeResult((int)response.StatusCode);
                }
                return new OkResult();
            }
        }

        [FunctionName("GetAllCategoriesActivity")]
        public static async Task<List<Category>> GetAllCategoriesActivity(
     [ActivityTrigger] object input,
     ILogger log)
        {
            try
            {
                var response = await _httpClient.GetAsync("https://essentialproducts-api.azurewebsites.net/api/Category/All");
                response.EnsureSuccessStatusCode();

                var content = await response.Content.ReadAsStringAsync();
                var categories = JsonConvert.DeserializeObject<Category[]>(content);

                return categories.ToList();
            }
            catch (HttpRequestException ex)
            {
                log.LogError(ex, "An error occurred while calling the API.");
                return new List<Category>();
            }
        }


         [FunctionName("LogAndEchoFunction")]
        public static IActionResult RunLogAndEchoFunction(
         [ActivityTrigger] IDurableActivityContext context,
         ILogger log)
        {
            log.LogInformation("C# Activity function processed a request.");

            var data =  context.GetInput<List<Category>>();
            // Log the data
            var dataJson = JsonConvert.SerializeObject(data);
            log.LogInformation($"Received data: {dataJson}");

            // Return the same data back to the caller
            return new OkObjectResult(data);
        }


        public class Category
        {
            public int Id { get; set; }
            public string Name { get; set; }
            public bool IsActive { get; set; }
        }
    }
}

In summary, the OrchestratorFunction function demonstrates the use of the fan-out/fan-in pattern in Durable Functions to execute a series of dependent steps in parallel. The function retrieves a list of categories, creates a new ID for each category, and saves the modified categories in parallel. Finally, the function calls an activity function to log the results of the operation.

Application Source Code @ LearnSmartCoding GitHub

Check out other topics that might interest you.

Conclusion

Developers can use Durable Functions to build complex workflows in a serverless environment, and they can leverage the fan-out/fan-in pattern, among many other patterns, to accomplish specific tasks.

With Durable Functions, developers can create reliable and scalable serverless workflows for a variety of use cases, from data processing to event-driven automation.

Leave a Reply

Your email address will not be published. Required fields are marked *

Verified by MonsterInsights