Azure Durable Functions

When I first heard about Durable Functions, my reaction was: "So cool! We can now build fully serverless streaming stateful pipelines!". Indeed, we can but it's not their single feature!

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

Durable functions - easy view

An agnostic durable functions workflow can look like in the following schema:

Everything starts with a trigger function. In the schema, it reads some streaming data, but you can also trigger it explicitly from an HTTP endpoint. No surprise, its responsibility is to intercept an event and eventually start the workflow.

In the next step, an orchestration function starts the coordination of all tasks composing the workflow. An interesting thing to notice here is the workflow representation. It's not expressed - at least in the code - as Azure Function. It's simply a method invocation, a little bit like in RPC protocol.

If we translated it to a data processing framework, so far, we've only seen the input reading and execution plan stages. But nothing has been done yet! The work starts after executing the orchestration logic. At this moment, the orchestrator starts one or multiple activities.

Along with the activities, the workflow can also have entity functions. The difference with the activities is their explicit state management and the lack of required orchestrator. The trigger function can directly trigger them as well!

Durable functions - complete view

But that was only for an easy view of the Durable Functions. To manage the orchestration and the state, the framework relies on a storage component which by default is an Azure Storage:

Each function has a component called task hub. It's a Storage Account composed of a: queue, table, and blob. Each of these services has a dedicated role in the workflow. The queue ensures communication between the functions. The table stores the state orchestration information like the activity or entity function status. It's also the place for the entity state - as long as the payload is smaller than the storage limit. When it happens, the framework stores the state in the blob and only references it in the table.

Azure Storage Account is the default and, as of this writing, the most mature storage provider. However, Azure works on more efficient backends like Netherite.

Pipeline example

I could have written much more about the theoretical parts of the Durable Functions, but it's not the goal. With the information presented so far, you should be able to understand an example of the framework. I'm going to use the Storage Emulator so that you can play around with the code even if you don't have an Azure subscription.

The entrypoint for my example will be an HTTP-triggered function. It will do 2 things. To start, it will directly call an entity function to demonstrate the session state management. After that it will trigger an orchestrator.:

async def main(req: func.HttpRequest, starter: str):
    run_id = req.params.get('run_id')

    client = DurableOrchestrationClient(starter)
    entity_state_key = f'my_counter_{run_id}'
    entity_id = EntityId(name='session_handler', key=entity_state_key)
    await client.signal_entity(entity_id, "add", 2)
    await client.signal_entity(entity_id, "get")

    instance_id = f'instance{run_id}'
    await client.start_new('message_orchestrator', instance_id, {'hellos_count': req.params.get('hellos_count')})

    return func.HttpResponse(status_code=202)

The entity function has 2 goals. First, it will read and write the state with the available getters and setters. Second, it will grow to a size that will not be accepted anymore as a serialized entry in the Table Storage. Instead, the state will go to a Blob object and the entry in the table will be transformed to a Blob reference. I omitted the state text for brevity but you can replace it by "Lorem ipsum..." generated text on 11 paragraphs:

When it comes to the orchestrator, it'll call an activity function with 2 different parameters. The activity function will simply return a "Hello world"-like text:

def orchestrator_function(context: df.DurableOrchestrationContext):
    logging.info(f'Got input {context.get_input()}')
    input_params = context.get_input() # this time, it's a dict!
    number_of_hellos = input_params['hellos_count']
    result1 = yield context.call_activity('message_logger', f'Text 1 - {number_of_hellos}')
    result2 = yield context.call_activity('message_logger', f'Text 2 - {number_of_hellos}')
    return [result1, result2]

def main(name: str) -> str:
    return f"Hello{name}!"

The code is available in my Github wfc-playground/azure-durable-function project. And you can find it in action in the demo just below:

Durable Functions are a native way to perform data orchestration tasks or stateful processing with Azure Functions. The solution leverages other Azure services to keep the state of the entity and the orchestration runs.