I really appreciate AWS services and one of the main reasons for that is the facility to implement event-driven systems. One of the interesting use cases of these events is related to the EMR service, responsible for running Apache Spark pipelines. In this post I will try to associate an action invoked every time an EMR step completes successfully.
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
I will start by explaining the CloudWatch events that can be used to monitor the state of EMR cluster. In the next part, I will write a simple Lambda function and show how to configure it with AWS CloudWatch events to monitor steps state.
EMR and CloudWatch events
All the events eligible for our Lambda's monitoring are listed in the Events part of the cluster's interface. You will find there every information about steps execution and cluster hardware, like for instance:
- Step s-1111 ("step example name 2") in Amazon EMR cluster j-1234T (test-emr-cluster) started running at 2019-01-01 10:55 UTC.
- Amazon EMR cluster j-1234T (test-emr-cluster) finished running all pending steps at 2019-01-01 10:41 UTC.
- Step s-1000 ("step example name") was added to Amazon EMR cluster j-1234T (test-emr-cluster) at 2019-01-01 10:26 UTC and is pending execution.
You can find the exhaustive list of events in the link to the AWS documentation from "Read also" section.
The EMR service automatically sends these events to a CloudWatch event stream. Therefore, you can use them exactly in the same way as other CloudWatch events, namely by creating the rule with a matching pattern. Let's go to the next section to see how to implement it.
Catching EMR step events with Lambda
Our monitoring Lambda will listen for 1 event which is the step successful termination. For the sake of simplicity, I will only print the step id but obviously, you can do much more interesting things like triggering an action on another service and so forth. Just take care to define the appropriate permissions to your Lambda.
The code starts here with the definition of the function in Serverless Framework file. I omitted some of unmeaningful code for the readability:
successful-step-printer: name: successful-step-printer-emr-test description: Prints a message when an EMR step terminates successfully handler: main.handle_step_change memorySize: 1024 timeout: 300 events: - cloudwatchEvent: description: 'CloudWatch Event triggered on EMR Instance pending state' event: source: - "aws.emr" detail-type: - "EMR Step Status Change" detail: state: - COMPLETED
Nothing complicated here. Serverless automatically gives permission to call the Lambda from the CloudWatch rule. If you want, you can also apply a finer level of control and specify the cluster whose events you want to listen for.
Otherwise, the function's code looks simply like that:
def terminate_step(event): print('Step {} terminated'.format(event['detail']['stepId'])) def handle_step_change(event, context): strategies = { 'EMR Step Status Change_COMPLETED': terminate_step } event_type = event['detail-type'] step_state = event['detail']['state'] detail_code = '{}_{}'.format(event_type, step_state) strategies[detail_code](event)
You can see that everything it does is to retrieve the interesting details of the event in order to build a key. The key is used later to retrieve the appropriate function to call. I prepared that to illustrate how, if you want, you could manage different events inside the same function. Of course, it's simpler to have one function per event but if you can't do that, the strategy design pattern from the snippet is one of the acceptable solutions.
Of course, this post is only a proof of the concept that we can react to EMR cluster events. Maybe you won't need it very often, but it's a way to build a self-healing service able to restart in case of human and technical errors. Also, the events coming from the steps can be used to trigger other actions and not necessarily Apache Spark processing. On the other side, such decentralized architecture may be difficult to monitor and if you want to get an idea how to solve it, I will publish soon a post about orchestration and choreography.