Listening EMR events with AWS Lambda

I really appreciate AWS services and one of the main reasons for that is the facility to implement event-driven systems. One of the interesting use cases of these events is related to the EMR service, responsible for running Apache Spark pipelines. In this post I will try to associate an action invoked every time an EMR step completes successfully.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

I will start by explaining the CloudWatch events that can be used to monitor the state of EMR cluster. In the next part, I will write a simple Lambda function and show how to configure it with AWS CloudWatch events to monitor steps state.

EMR and CloudWatch events

All the events eligible for our Lambda's monitoring are listed in the Events part of the cluster's interface. You will find there every information about steps execution and cluster hardware, like for instance:

You can find the exhaustive list of events in the link to the AWS documentation from "Read also" section.

The EMR service automatically sends these events to a CloudWatch event stream. Therefore, you can use them exactly in the same way as other CloudWatch events, namely by creating the rule with a matching pattern. Let's go to the next section to see how to implement it.

Catching EMR step events with Lambda

Our monitoring Lambda will listen for 1 event which is the step successful termination. For the sake of simplicity, I will only print the step id but obviously, you can do much more interesting things like triggering an action on another service and so forth. Just take care to define the appropriate permissions to your Lambda.

The code starts here with the definition of the function in Serverless Framework file. I omitted some of unmeaningful code for the readability:

  successful-step-printer:
    name: successful-step-printer-emr-test
    description: Prints a message when an EMR step terminates successfully
    handler: main.handle_step_change
    memorySize: 1024
    timeout: 300
    events:
      - cloudwatchEvent:
          description: 'CloudWatch Event triggered on EMR Instance pending state'
          event:
              source:
                  - "aws.emr"
              detail-type:
                  -  "EMR Step Status Change"
              detail:
                  state:
                  - COMPLETED

Nothing complicated here. Serverless automatically gives permission to call the Lambda from the CloudWatch rule. If you want, you can also apply a finer level of control and specify the cluster whose events you want to listen for.

Otherwise, the function's code looks simply like that:

def terminate_step(event):
    print('Step {} terminated'.format(event['detail']['stepId']))

def handle_step_change(event, context):
    strategies = {
        'EMR Step Status Change_COMPLETED': terminate_step
    }
    event_type = event['detail-type']
    step_state = event['detail']['state']
    detail_code = '{}_{}'.format(event_type, step_state)
    strategies[detail_code](event)

You can see that everything it does is to retrieve the interesting details of the event in order to build a key. The key is used later to retrieve the appropriate function to call. I prepared that to illustrate how, if you want, you could manage different events inside the same function. Of course, it's simpler to have one function per event but if you can't do that, the strategy design pattern from the snippet is one of the acceptable solutions.

Of course, this post is only a proof of the concept that we can react to EMR cluster events. Maybe you won't need it very often, but it's a way to build a self-healing service able to restart in case of human and technical errors. Also, the events coming from the steps can be used to trigger other actions and not necessarily Apache Spark processing. On the other side, such decentralized architecture may be difficult to monitor and if you want to get an idea how to solve it, I will publish soon a post about orchestration and choreography.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©