Skip to content

Latest commit

 

History

History

emr_job

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Amazon Managed Workflows for Apache Airflow (MWAA) and Amazon EMR

Use Amazon Managed Workflows for Apache Airflow (MWAA) to design and run a serverless workflow that coordinates Amazon Elastic Map Reduce (EMR) jobs. Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto.

Versions Supported

Apache Airflow 1.10.12 on Amazon MWAA

Setup

Copy the file into your DAGs folder, and ensure you have connectivity to Amazon EMR. For the latter, When using an AWS IAM role to connect to Amazon EMR, either with Amazon MWAA’s Execution Role or an assumed role in Amazon EC2, you will need to provide AmazonElasticMapReduceFullAccess Policy access to that role via the AWS IAM console. You will also need Amazon EMR configured on your AWS account.

Files

Requirements.txt needed

If using the 2.0 version on 1.10.12 use:

Plugins needed

None.

Explanation

All DAGs begin with the default import line:

from airflow import DAG

Then we will import the operators and sensors we will use in our workflow:

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor

We’ll then import a couple of time utilities

from airflow.utils.dates import days_ago
from datetime import timedelta

Now we’ll define some default arguments to use in our tasks:

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
}

Next we’ll define what our EMR cluster should look like. This will use the mykeypair Key Pair, EMR_DefaultRole, and EMR_EC2_DefaultRole created above

JOB_FLOW_OVERRIDES = {
    'Name': 'my-demo-cluster',
    'ReleaseLabel': 'emr-5.30.1',
    'Applications': [
        {
            'Name': 'Spark'
        },
    ],    
    'Instances': {
        'InstanceGroups': [
            {
                'Name': "Master nodes",
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': "Slave nodes",
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 2,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
        'Ec2KeyName': 'mykeypair',
    },
    'VisibleToAllUsers': True,
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole'
}

We’ll also need to define the EMR job step we wish to run. In this case we’ll use the built-in calculate_pi sample

SPARK_STEPS = [
    {
        'Name': 'calculate_pi',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
        },
    }
]

Next we’ll create our DAG object:

with DAG(
    dag_id='emr-job-dag',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    start_date=days_ago(1),
    schedule_interval='@once',
    tags=['emr'],
) as dag:

We’ll define an EmrCreateJobFlowOperator that will create a new cluster

    cluster_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow', 
        emr_conn_id='aws_default', 
        job_flow_overrides=JOB_FLOW_OVERRIDES
    )

Then an EmrAddStepsOperator to add our calculate_pi step

    step_adder = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=SPARK_STEPS,
    )

And an EmrStepSensor that will wait for our step to complete

    step_checker = EmrStepSensor(
        task_id='watch_step',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
        aws_conn_id='aws_default',
    )

Finally we have to tell our DAG what order to run our operators

cluster_creator >> step_adder >> step_checker 

Security

See CONTRIBUTING for more information.

License

This library is licensed under the MIT-0 License. See the LICENSE file.