2026-01
使用 Amazon MWAA 编排 Amazon EMR Serverless Spark 作业,并
使用 Amazon MWAA 进行 Amazon EMR Serverless Spark 作业的编排及数据验证
关键要点
数据工程的复杂性在增加,组织需要高效的工作流来处理大规模数据。使用 Amazon MWAA 来简化数据管道的构建和管理,不用担心基础设施。Amazon EMR Serverless 提供灵活的成本解决方案,方便快速处理数据。Amazon Athena 支持无服务器架构,使用标准 SQL 进行数据分析。本文展示了如何结合这些服务创建一个完整的端到端数据处理管道。随着数据工程变得越来越复杂,组织们正在寻找新的方法来简化数据处理工作流。目前,许多数据工程师使用 Apache Airflow 来构建、调度和监控数据管道。然而,随着数据量的增加,管理和扩展这些管道可能变成一项艰巨的任务。使用 Amazon Managed Workflows for Apache AirflowAmazon MWAA可以帮助简化数据管道的构建、运行和管理。通过提供完全管理的平台,Amazon MWAA 使数据工程师可以专注于构建数据工作流,而无需担心基础设施。
现今,企业和组织需要高效且经济实惠的方式来处理大量数据。Amazon EMR Serverless 是一种能处理大规模数据的低成本扩展方案。Apache Airflow 中的 Amazon 提供商自带 EMR Serverless 操作符,并已包含在 Amazon MWAA 中,使数据工程师能够轻松构建可扩展且可靠的数据处理管道。您可以使用 EMR Serverless 来运行数据上的 Spark 作业,同时使用 Amazon MWAA 来管理这些作业之间的工作流和依赖关系。这种集成还可通过自动扩缩资源来帮助降低处理数据的成本。
Amazon Athena 是一种无服务器的、交互式分析服务,基于开源框架,支持开放的表和文件格式。您可以使用标准 SQL 与数据交互。Athena 使这一切变得可能,不需要管理复杂的基础架构。
在本文中,我们将使用 Amazon MWAA、EMR Serverless 和 Athena 构建一个完整的端到端数据处理管道。
解决方案概述
以下图示说明了解决方案架构。

工作流包括以下步骤:
创建一个 Amazon MWAA 工作流,从您的输入 Amazon Simple Storage ServiceAmazon S3存储桶中检索数据。使用 EMR Serverless 处理存储在 Amazon S3 中的数据。EMR Serverless 会根据工作负载自动扩展或缩减,因此您不用担心基础设施的预配或管理。使用 EMR Serverless 通过 PySpark 代码转换数据,然后将转换后的数据存回您的 S3 存储桶。使用 Athena 基于 S3 数据集创建外部表并运行查询以分析转换后的数据。Athena 使用 AWS Glue 数据目录存储表的元数据。前提条件
您需要具备以下前提条件:
一个 AWS 账户。对于 Amazon S3、用于运行 SQL 查询的 Athena、用于创建环境的 Amazon MWAA 以及 EMR Serverless 的基本理解。一个具有两个私有子网的 VPC。具备创建 Amazon MWAA 集群、创建 AWS Glue 数据目录及使用 Athena 运行 SQL 查询权限的 AWS Identity and Access ManagementIAM角色。具有运行 EMR Serverless 应用程序并可以从 S3 存储桶读取的 emrserverlessexecutionrole作业执行角色,以及对 Amazon MWAA 集群执行角色的 iamPassRole 权限。有关更多信息,请参见 EMR Serverless 示例、作业运行时角色 和 添加和删除 IAM 身份权限。数据准备
为了展示如何使用 EMR Serverless 作业结合 Amazon MWAA 以及通过 Athena 进行数据验证,我们使用公开可用的 NYC 出租车数据集。请将以下数据集下载到本地计算机:
绿色和黄色出租车行程记录 包含黄色和绿色出租车的行程记录信息,如接送日期时间、位置、行驶距离和付款方式。本示例中我们使用 2022 年最新的 Parquet 文件。出租车区域查找数据集 提供出租车对应位置 ID 和区域详细信息的数据集。在后续步骤中,我们将这些数据集上传到 Amazon S3。
创建解决方案资源
此部分概述了设置数据处理和转换的步骤。
创建 EMR Serverless 应用程序
您可以创建一个或多个使用开源分析框架如 Apache Spark 或 Apache Hive的 EMR Serverless 应用程序。与基于 EC2 的 EMR 不同,您不需要删除或终止 EMR Serverless 应用程序。EMR Serverless 应用程序仅仅是一个定义,一旦创建,可以根据需要重复使用。这使得 MWAA 工作流更加简单,您只需向预创建的 EMR Serverless 应用程序提交作业。
默认情况下,EMR Serverless 应用程序在作业提交时会自动启动,并在 15 分钟空闲后自动停止,以确保成本效益。您可以修改空闲时间的数量或选择关闭此功能。
要通过 EMR Serverless 控制台创建应用程序,请按照 “创建 EMR Serverless 应用程序” 的说明进行操作。 请记下应用程序 ID,因为我们将在后续步骤中使用它。
创建 S3 存储桶和文件夹
按照以下步骤设置您的 S3 存储桶和文件夹:
在 Amazon S3 控制台中,创建一个 S3 存储桶 以存储数据集。记下 S3 存储桶的名称,以便在后续步骤中使用。创建一个用于存储输入数据的 inputdata 文件夹。在该文件夹内,创建三个独立文件夹,分别用于每个数据集:green、yellow 和 zonelookup。您可以下载并使用最新的数据集。对于我们的测试,我们使用以下文件:
green/ 文件夹包含文件 greentripdata202206parquetyellow/ 文件夹包含文件 yellowtripdata202206parquetzonelookup/ 文件夹包含文件 taxizonelookupcsv设置 Amazon MWAA DAG 脚本
完成以下步骤以设置您的 DAG 脚本:
梯子免费版下载以下脚本到本地计算机:requirementstxt Python 依赖项是指未包含在您的 Amazon MWAA 环境的 Apache Airflow 基础安装中的任何包或分发。本篇中,我们使用 Boto3 version gt=1239。blogdagmwaaemrsnytaxipy 该脚本是 Amazon MWAA DAG 的一部分,包括任务 yellowtaxizonelookup、greentaxizonelookup 和 nytaxisummary。这些任务涉及运行 Spark 作业以查找出租车区域并生成数据摘要。greenzonepy 该 PySpark 脚本读取绿色出租车行程和区域查找的数据文件,执行连接操作以将它们合并,并生成包含绿色出租车行程和区域信息的输出文件。它利用临时视图和数据框进行列连接,并聚合乘客数、行驶距离和费用等数据。最后,它在指定的 S3 存储桶中创建 outputdata 文件夹,以 Parquet 文件的形式写入结果数据框 dfgreenzone。yellowzonepy 该 PySpark 脚本处理黄色出租车行程和区域查找的数据,通过连接生成包含黄色出租车行程和区域信息的输出文件。该脚本接受用户提供的 S3 存储桶名称,并以应用程序名称 yellowzone 初始化一个 Spark 会话。它从指定的 S3 存储桶读取黄色出租车文件和区域查找文件,创建临时视图,基于位置 ID 进行连接,并计算乘客数、行驶距离和费用等统计数据。最后,它在指定的 S3 存储桶中创建 outputdata 文件夹,以 Parquet 文件的形式写入结果数据框 dfyellowzone。nytaxisummarypy 该 PySpark 脚本处理 greenzone 和 yellowzone 文件,聚合出租车行程的统计数据,按服务区域和位置 ID 分组数据。它需要 S3 存储桶名称作为命令行参数,创建名为 nytaxisummary 的 SparkSession,从 S3 读取文件,执行连接并生成新的名为 nytaxisummary 的数据框。它在指定的 S3 存储桶中创建 outputdata 文件夹,以新的 Parquet 文件的形式写入结果数据框。在本地计算机上,更新 blogdagmwaaemrsnytaxipy 脚本中的以下信息:在以下两行中更新您的 S3 存储桶名:
pythonS3LOGSBUCKET = ltltbucketnameheregtgtS3BASEBUCKET = ltltbucketnameheregtgt
更新您的角色名称 ARN:
pythonJOBROLEARN = “ltltemrserverlessexecutionrole ARN heregtgt”eg arnawsiamltltACCOUNTIDgtgtrole/ltltROLENAMEgtgt
更新 EMR Serverless 应用 ID。使用之前创建的应用 ID:
pythonEMRSERVERLESSAPPLICATIONID = “ltltemr serverless application ID heregtgt”
将 requirementstxt 文件上传到之前创建的 S3 存储桶。
在 S3 存储桶中,创建一个名为 dags 的新文件夹,并上传从本地计算机更新的 blogdagmwaaemrsnytaxipy 文件。在 Amazon S3 控制台中,在 S3 存储桶内部创建一个名为 scripts 的新文件夹,并从本地计算机将脚本上传到该文件夹。创建 Amazon MWAA 环境
要创建 Airflow 环境,请完成以下步骤:
在 Amazon MWAA 控制台中,选择 创建环境。在 名称 中,输入 mwaaemrsathenapipeline。在 Airflow 版本 中,选择最新版本本篇为 251。在 S3 存储桶 中,输入您 S3 存储桶的路径。在 DAGs 文件夹 中,输入您的 dags 文件夹的路径。在 要求文件 中,输入 requirementstxt 文件的路径。选择 下一个。在 虚拟私有云 (VPC) 中,选择一个至少具有两个私有子网的 VPC。这将填充您的 VPC 中两个私有子网。
在 Web 服务器访问 下,选择 公用网络。这允许获得访问权限的用户通过 IAM 政策 访问 Apache Airflow UI。
对于 安全组,选择 创建新安全组。对于 环境类,选择 mw1small。对于 执行角色,选择 创建新角色。对于 角色名称,输入一个名称。保持其它配置为默认值并选择 下一个。在下一页面上,选择 创建环境。创建您的 Amazon MWAA 环境可能需要大约 2030 分钟。
当 Amazon MWAA 环境状态更改为 可用 时,导航到 IAM 控制台并更新集群执行角色,以添加 传递角色权限 至 emrserverlessexecutionrole。触发 Amazon MWAA DAG
要触发 DAG,请完成以下步骤:
在 Amazon MWAA 控制台中,选择导航窗格中的 环境。打开您的环境并选择 打开 Airflow UI。选择 blogdagmwaaemrnytaxi,选择播放图标,点击 触发 DAG。当 DAG 正在运行时,选择 DAG blogdagmwaaemrsnytaxi,点击 图形 以查看 DAG 执行工作流。DAG 完成全部脚本的运行大约需要