Introduction
In serverless computing, parallel processing is essential to handle large workloads efficiently. The fan-out/fan-in pattern represents a popular approach for parallel processing, where it splits a single operation into multiple sub-operations, executes them in parallel, and merges their results to produce the final output. Durable Functions in Azure provides a powerful way to implement the fan-out/fan-in pattern.
In this blog post, we will show you how to use Durable Functions to implement the fan-out/fan-in pattern using an example. We will walk you through the code for an orchestrator function that calls two activity functions in parallel and waits for their results to be returned before continuing.
Let’s consider an example where we have to perform the following tasks in parallel:
- Get all categories
- Save each category with a new ID and updated name
- Log and echo the updated categories
Demo
Here is the code for the Orchestrator Function that uses Durable Functions to implement the fan-out/fan-in pattern:
[FunctionName("OrchestratorFunction")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
// Call GetAllCategories function
var categories = await context.CallActivityAsync<List<Category>>("GetAllCategoriesActivity", null);
// Loop through each category and call SaveCategoryFunction
var tasks = new List<Task>();
var maxId = categories.Max(x => x.Id) + 1;
foreach (var category in categories)
{
category.Id = maxId;
category.Name = $"{category.Name}-{maxId}";
tasks.Add(context.CallActivityAsync("SaveCategoryFunction", category));
++maxId;
}
// Wait for all SaveCategoryFunction calls to finish
await Task.WhenAll(tasks);
// Call LogAndEchoFunction passing the data from GetAllCategories
await context.CallActivityAsync("LogAndEchoFunction", categories);
}
The code for the Orchestrator Function is straightforward. It first calls the GetAllCategoriesActivity
function to get all categories. Then, it loops through each category and calls the SaveCategoryFunction
in parallel. The Task.WhenAll
method is used to wait for all the SaveCategoryFunction
calls to complete. Finally, it calls the LogAndEchoFunction
function, passing the data from GetAllCategories
.
Here is the complete code.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace LearnSmartCoding_AZ204_Functions_Demo
{
public static class OrchestratorFunction
{
private static readonly HttpClient _httpClient = new HttpClient();
[FunctionName("OrchestratorFunction")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
// Call GetAllCategories function
var categories = await context.CallActivityAsync<List<Category>>("GetAllCategoriesActivity", null);
// Loop through each category and call SaveCategoryFunction
var tasks = new List<Task>();
var maxId = categories.Max(x => x.Id) + 1;
foreach (var category in categories)
{
category.Id = maxId;
category.Name = $"{category.Name}-{maxId}";
tasks.Add(context.CallActivityAsync("SaveCategoryFunction", category));
++maxId;
}
// Wait for all SaveCategoryFunction calls to finish
await Task.WhenAll(tasks);
// Call LogAndEchoFunction passing the data from GetAllCategories
await context.CallActivityAsync("LogAndEchoFunction", categories);
}
[FunctionName("OrchestratorFunction_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
string instanceId = await starter.StartNewAsync("OrchestratorFunction", null);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
// Call the OrchestratorFunction, passing in the instance ID
var result = await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(req, instanceId, TimeSpan.FromSeconds(10));
return result;
}
[FunctionName("SaveCategoryFunction")]
public static async Task SaveCategoryFunction(
[ActivityTrigger] IDurableActivityContext context,
ILogger log)
{
log.LogInformation("SaveCategoryFunction processed a request.");
Category category = context.GetInput<Category>();
log.LogInformation($"Category name: {category.Name}");
// Save the category to the specified URL
using (var httpClient = new HttpClient())
{
var json = JsonConvert.SerializeObject(category);
var content = new StringContent(json, Encoding.UTF8, "application/json");
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
var response = await httpClient.PostAsync("https://essentialproducts-api.azurewebsites.net/api/Category", content);
if (!response.IsSuccessStatusCode)
{
log.LogError($"Error saving category. Status code: {response.StatusCode}");
throw new Exception("Error saving category.");
}
}
}
[FunctionName("DeleteCategory")]
public static async Task<IActionResult> DeleteCategory(
[ActivityTrigger] string id,
ILogger log)
{
using (var httpClient = new HttpClient())
{
var apiUrl = $"https://essentialproducts-api.azurewebsites.net/api/Category/{id}";
var response = await httpClient.DeleteAsync(apiUrl);
if (response.StatusCode == HttpStatusCode.NotFound)
{
return new NotFoundResult();
}
else if (!response.IsSuccessStatusCode)
{
return new StatusCodeResult((int)response.StatusCode);
}
return new OkResult();
}
}
[FunctionName("GetAllCategoriesActivity")]
public static async Task<List<Category>> GetAllCategoriesActivity(
[ActivityTrigger] object input,
ILogger log)
{
try
{
var response = await _httpClient.GetAsync("https://essentialproducts-api.azurewebsites.net/api/Category/All");
response.EnsureSuccessStatusCode();
var content = await response.Content.ReadAsStringAsync();
var categories = JsonConvert.DeserializeObject<Category[]>(content);
return categories.ToList();
}
catch (HttpRequestException ex)
{
log.LogError(ex, "An error occurred while calling the API.");
return new List<Category>();
}
}
[FunctionName("LogAndEchoFunction")]
public static IActionResult RunLogAndEchoFunction(
[ActivityTrigger] IDurableActivityContext context,
ILogger log)
{
log.LogInformation("C# Activity function processed a request.");
var data = context.GetInput<List<Category>>();
// Log the data
var dataJson = JsonConvert.SerializeObject(data);
log.LogInformation($"Received data: {dataJson}");
// Return the same data back to the caller
return new OkObjectResult(data);
}
public class Category
{
public int Id { get; set; }
public string Name { get; set; }
public bool IsActive { get; set; }
}
}
}
In summary, the OrchestratorFunction
function demonstrates the use of the fan-out/fan-in pattern in Durable Functions to execute a series of dependent steps in parallel. The function retrieves a list of categories, creates a new ID for each category, and saves the modified categories in parallel. Finally, the function calls an activity function to log the results of the operation.
Application Source Code @ LearnSmartCoding GitHub
Check out other topics that might interest you.
- Automating API Requests with C# and Azure Functions using HttpTrigger.
- Getting Started with Azure Durable Functions: An Example of Function Chaining
- Building an Azure Function with Input Binding to Read and Process Excel Files from Azure Blob Storage
Conclusion
Developers can use Durable Functions to build complex workflows in a serverless environment, and they can leverage the fan-out/fan-in pattern, among many other patterns, to accomplish specific tasks.
With Durable Functions, developers can create reliable and scalable serverless workflows for a variety of use cases, from data processing to event-driven automation.