作者 Sriharsh Adari Joe Morotti Uma Ramadoss发表日期 2023年12月18日分类 高级 (300),Amazon RDS,AWS Glue,AWS Step Functions,技术教程永久链接 评论
AWS Step Functions 是一个完全托管的可视化工作流服务,使您能够构建复杂的数据处理管道,涉及各种提取、转换和加载ETL技术,如 AWS Glue、Amazon EMR 和 Amazon Redshift。您可以通过将单个数据管道任务连接起来,并以最少的代码配置有效负载、重试和错误处理来直观地构建工作流。
虽然 Step Functions 支持当数据管道任务因瞬时或过渡性错误而失败时的自动重试和错误处理,但在管道运行过程中可能会出现如权限不正确、数据无效及业务逻辑失败等永久性错误。这将需要您识别步骤中的问题,修复该问题,并重新启动工作流。过去,重新运行失败的步骤需要从头开始重启整个工作流,这会导致工作流完成的延迟,尤其是在复杂且长时间运行的 ETL 管道中。如果管道中使用了许多映射和并行状态,这也会导致由于从头开始运行管道而增加成本。
现在,Step Functions 新增了支持从失败、被中止或超时状态重新驱动工作流的能力,这样您可以更快、更低成本地完成工作流,并花更多时间提供业务价值。当下游问题得到解决后,您可以使用提供给失败状态的相同输入迅速恢复未处理的故障。
在本文中,我们展示了一个 ETL 管道作业,它使用 Step Functions 的分布式映射状态从 Amazon RDS 表中导出数据。然后,我们模拟一个故障并演示如何使用新的重新驱动功能从故障点重新启动失败的任务。
数据管道中一个常见的功能是从多个数据源提取数据并将其导出到数据湖或同步到另一个数据库。您可以使用 Step Functions 的分布式映射状态并行运行数百个导出或同步作业。分布式映射可以从 Amazon S3 读取数百万个对象或从单个 S3 对象读取数百万条记录,并将记录分发到下游步骤。Step Functions 在分布式映射中作为子工作流运行步骤,最大并行度可达 10000。这一并发数量远高于许多其他 AWS 服务的支持并发性,例如 AWS Glue,后者的作业运行软限制为 1000。
示例数据管道从 Amazon DynamoDB 获取产品目录数据,并从 Amazon RDS for PostgreSQL 数据库获取客户订单数据。然后对数据进行清洗、转换,并上传至 Amazon S3 以供进一步处理。数据管道以 AWS Glue 爬虫 开始,创建 RDS 数据库的 数据目录。由于启动 AWS Glue 爬虫是异步的,因此管道中有一个等待循环,以检查爬虫是否完成。在 AWS Glue 爬虫完成后,管道从 DynamoDB 表和 RDS 表中提取数据。因为这两个步骤是独立的,所以它们作为并行步骤运行:一个使用 AWS Lambda 函数将数据从 DynamoDB 导出、转换并加载到 S3 存储桶中,另一个使用带有 AWS Glue 作业同步集成 的分布式映射将其从 RDS 表导出到 S3 存储桶。请注意,调用 AWS Glue 作业时需要 AWS 身份与访问管理 (IAM) 权限。有关更多信息,请参考 通过 Step Functions 调用 AWS Glue 作业的 IAM 策略。
以下图示说明了 Step Functions 工作流。
在 RDS 数据库中有多个与客户和订单数据相关的表。Amazon S3 托管所有表的元数据,作为 csv 文件。管道利用 Step Functions 的分布式映射从 Amazon S3 中读取表元数据,迭代每一项,并并行调用下游 AWS Glue 作业以导出数据。以下是相关代码:
jsonStates { Map { Type Map ItemProcessor { ProcessorConfig { Mode DISTRIBUTED ExecutionType STANDARD } StartAt Export data for a table States { Export data for a table { Type Task Resource arnawsstatesgluestartJobRunsync Parameters { JobName ExportTableData Arguments { dbtable tables } } End true } } } Label Map ItemReader { Resource arnawsstatess3getObject ReaderConfig { InputType CSV CSVHeaderLocation FIRSTROW } Parameters { Bucket 123456789012stepfunctionredrive Key tablescsv } } ResultPath null End true }}
要部署此解决方案,您需要满足以下前提条件:
一个有效的 AWS 账户适当的 IAM 权限以部署 AWS CloudFormation 堆栈资源完成以下步骤以通过 AWS CloudFormation 部署解决方案资源:
flyingbird 飞鸟机场选择 启动堆栈 以启动 CloudFormation 堆栈
输入堆栈名称。
选择 能力和转换 下的所有复选框。选择 创建堆栈。CloudFormation 模板会创建多个资源,包括以下内容:
之前描述的作为 Step Functions 工作流的数据管道一个用于存储导出的数据和 Amazon RDS 表元数据的 S3 存储桶一个 DynamoDB 中的产品目录表预加载表的 RDS for PostgreSQL 数据库实例一个爬取 RDS 表并创建 AWS Glue 数据目录的 AWS Glue 爬虫一个参数化的 AWS Glue 作业,将数据从 RDS 表导出到 S3 存储桶一个将数据从 DynamoDB 导出到 S3 存储桶的 Lambda 函数完成以下步骤以测试解决方案:
在 Step Functions 控制台中,选择导航窗格中的 状态机。选择名为 ETLProcess 的工作流。
使用默认输入运行工作流。
在几秒钟内,工作流在分布式映射状态处失败。
您可以通过访问 Step Functions 工作流执行事件查看映射运行错误。在此示例中,您可以识别异常是由于 AWS Glue 的 GlueConcurrentRunsExceededException。该错误指示请求运行 AWS Glue 作业的并发请求超过了已配置的数量。分布式映射从 Amazon S3 读取表元数据,并根据 csv 文件中的行数调用尽可能多的 AWS Glue 作业,但 AWS Glue 作业在创建时设置的并发性为 3。这导致子工作流失败,将故障级联到分布式映射状态,然后是并行状态。并行状态中用于获取 DynamoDB 表的另一步骤成功运行。如果并行状态中的任何步骤失败,整个状态都将失败,如级联故障所示。

默认情况下,当状态报告错误时,Step Functions 会导致工作流失败。您可以有多种方式处理分布式映射状态的故障:
Step Functions 让您能够 捕获错误、重试错误和回退到其他状态 以优雅地处理错误。以下是相关代码:
jsonRetry [ { ErrorEquals [ GlueConcurrentRunsExceededException ] BackoffRate 20 IntervalSeconds 10 MaxAttempts 3 Comment Exception JitterStrategy FULL }]
有时,企业可以容忍故障。这在处理数百万项目时尤其如此,期待数据集中的数据质量问题。默认情况下,当映射状态的某个迭代失败时,所有其他迭代都会被中止。在分布式映射中,您可以指定最大失败项目的数量或比例作为失败阈值。如果失败在可接受范围内,分布式映射不会失败。
分布式映射状态允许您控制子工作流的并发性。您可以将并发性设置为映射到 AWS Glue 作业的并发性。请记住,这一并发性仅在工作流执行级别适用,而非跨工作流执行。在修复错误根本原因后,您可以从故障点重新驱动失败状态。示例解决方案中的问题根本原因是 AWS Glue 作业的并发性。为了解决此问题,您可以通过重新驱动失败状态来解决,完成以下步骤:
在 AWS Glue 控制台中,导航至名为 ExportsTableData 的作业。在 作业详情 标签下,更新 最大并发性 为 5。借助重新驱动功能,您可以使用重新驱动功能重新启动 标准工作流 的执行,这些工作流在过去 14 天内未成功完成。这包括失败、被中止或超时的运行。您只能从失败的步骤重新驱动工作流,使用与上一个非成功状态相同的输入。您不能使用与初始工作流执行不同的状态机定义重新驱动失败的工作流。在成功重新驱动失败状态后,Step Functions 会自动运行所有下游任务。有关分布式映射重新驱动工作方式的更多信息,请参考 重新驱动映射运行。
由于分布式映射将映射内的步骤作为子工作流运行,因此工作流 IAM 执行角色需要权限来重新驱动映射运行以重新启动分布式映射状态:
json{ Version 20121017 Statement [ { Effect Allow Action [ statesRedriveExecution ] Resource arnawsstatesuseast2123456789012executionmyStateMachine/myMapRunLabel } ]}
您可以通过 AWS 命令行界面 (AWS CLI) 或 AWS SDK 编程来从失败的步骤重新驱动工作流,或者使用提供可视化操作体验的 Step Functions 控制台。
在 Step Functions 控制台中,导航至要重新驱动的失败工作流。在 详细信息 标签下,选择 从错误中重新驱动。现在,由于有足够的并发性来运行 AWS Glue 作业,管道成功运行。
要通过编程方式从其故障点重新驱动工作流,请调用 新重新驱动执行 API 操作。相同的工作流从最后一个非成功状态开始,并使用初始失败工作流中的最后一个非成功状态的相同输入。要重新驱动的状态来自工作流定义,之前的输入是不可变的。
注意以下关于不同类型子工作流的内容:
表达子工作流的重新驱动 对于在分布式映射中作为表达工作流存在的失败子工作流,重新驱动功能确保从子工作流的开头无缝恢复。这允许您解决特定于单个迭代的问题,而无需重新启动整个映射。标准子工作流的重新驱动 对于分布式映射内的标准工作流中的失败子工作流,重新驱动功能的运作方式与独立标准工作流相同。您可以在每个映射迭代内从其故障点重新启动失败状态,跳过已成功运行的不必要步骤。您可以使用 Step Functions 状态变化通知 与 Amazon EventBridge 设置故障通知,例如在故障时发送电子邮件。
要清理您的资源,请通过 AWS CloudFormation 控制台删除 CloudFormation 堆栈。
在本文中,我们展示了如何使用 Step Functions 的重新驱动功能通过从故障点重新启动失败的步骤来重新驱动分布式映射中的一步。分布式映射状态允许您编写协调大规模并行工作负载的工作流,使之适用于无服务器应用程序。Step Functions 最大并行度可达 10000,远高于许多 AWS 服务的支持并发性。
要了解更多关于分布式映射的信息,请参考 [Step Functions 分布式映射](https//server