集团新闻

实现 AWS DataSync 与数亿个对象的集成 存储博客

2026-01-27 13:58:52
16次

使用 AWS DataSync 迁移数亿个对象

关键要点

AWS DataSync 近日宣布支持清单功能,允许用户指定待转移的对象,能够显著提升迁移效率。迁移大型数据集面临多个挑战,包括 RTO 和 RPO 的平衡,以及技术限制,比如延迟与带宽配额。本文将介绍 DataSync 在处理超过 1 亿个源对象时的有效方案,包括如何使用清单文件和事件驱动的方法提高数据迁移效率。

在混合云环境中迁移大量数据可能是一个令人畏惧的任务,尤其是当面临来自网络、存储、计算和操作系统层的技术限制时。用户在满足合规需求的同时,需要平衡其恢复时间目标 (RTO) 和恢复点目标 (RPO),并优化任务调度和传输时间。这些挑战可能表现为多种形式,包括延迟、带宽配额、以及待迁移数据集的大小和数量。

2024年2月7日,AWS DataSync 宣布支持 清单功能,这一新功能使用户能够提供待转移源文件或对象的明确定义列表。使用清单,用户能够通过仅指定需处理的文件或对象,减少任务执行时间。

在这篇文章中,我们将展示如何使用 DataSync 处理超过 1 亿个源对象并将其复制和同步到自管理存储。我们分享一些建议,帮助您更高效地迁移数据,并防止影响扩展能力的问题,如延长的数据传输等待时间。此外,我们将详细介绍如何使用 DataSync 发布的清单文件功能。

DataSync 概述

DataSync 简化并加速数据迁移至 AWS,帮助用户快速、安全地在本地存储、边缘位置、其他云和 AWS 存储之间移动数据。

利用 DataSync 迁移大量数据如超过 1 亿个 S3 对象的用户,需战略性地思考如何优化数据传输任务,同时在 DataSync 的固有阈值范围内进行操作。

清单文件概述

清单是您希望 DataSync 转移的文件或对象的列表。例如,与复制 S3 桶中的所有对象不同,DataSync 仅能复制清单中包含的对象。这一功能为用户提供了更精确的控制权,优于在需要针对大量特定文件时使用的 包含过滤器。

相关配额回顾

DataSync 配额文档 列出了以下信息:

配额项数值每个任务的最大文件或对象数量50000000每个任务执行的最大过滤器字符数100000最多并行排队的任务执行数量50每个任务排队的最大对象数2500000000

这是需要克服的第一限制,如果目标是达到超过 1 亿个对象。此外,在创建解决方案时,先前的最大过滤器字符数和执行历史配额也具有相关性。

解决方案概述

该解决方案包括以下几个步骤:

重新组织数据:分段源数据,使其保持在 DataSync 最大任务执行对象配额之内。基于事件的清单文件:配置任务使其只处理已知的新数据或变更的数据。使用包含过滤器的大批量迁移:以编程方式将源数据分割成大批量,以排队作为 DataSync 任务执行。

前提条件

该解决方案需要:

一个 AWS 账户一个 DataSync 代理中等Python编程技能

解决方案实施步骤

以下部分将逐步介绍各种方法。

重新组织数据

在配置 DataSync 任务时,确保源数据结构能够定义对象计数在 5000 万个以下,以避免超出任务配额。以下是进行这种重新组织的几种选项:

按路径将对象拆分为基于时间戳的独立桶。当对象数量达到某一门槛时,使用新的路径或桶。

然而,依据特定的工作负载和与源数据相关的其他外部因素,这些方法可能并不实用。

基于事件的清单文件

采用基于事件的方法可以确保只有必需通过 DataSync 迁移的文件在指定的任务执行范围内。我们通过调用 starttaskexecution 和 manifestconfig 来实现,这个清单中包含了 DataSync 任务执行的在范围内的文件/对象列表。关于应用程序编程接口API代码的详细信息,请参考 DataSync 文档。

清单文件限制了扫描和处理活动,仅对清单文件中标识的文件/对象进行操作。这不仅降低了整体 DataSync 执行时间,还减少了 Amazon S3 扫描成本。

在 Amazon S3 中生成新对象会产生 s3ObjectCreated 事件,该事件发送至 Amazon EventBridge。对于每个上传至 Amazon S3 的新对象,EventBridge 会发送 JSON 负载,描述该对象至 Amazon SQS 队列。将 Amazon SQS 作为事件目的地,可以持久捕获这些事件以供处理。

AWS Lambda 提供了处理这些事件负载所需的计算资源,允许与 Amazon SQS 直接集成。此集成为记录提供简单的缓冲,此缓冲可以配置长达五分钟。

依据您的源桶中文件上传的频率,缓冲时间可以超过最大五分钟。这样可以通过一次传递更多对象,减少 DataSync 任务的调用次数。但它会引入传输延迟。您需要评估传输大数据集的影响与 RPO 之间的取舍。

要实现超过五分钟的缓冲时间,您可以使用 Amazon EventBridge 的定时规则调用 Lambda。

从 Amazon SQS 检索消息

为了从 Amazon SQS 队列获取消息,Lambda 函数使用 boto3 Amazon SQS 客户端库中的 getqueueattributes 和 receivemessages API 调用。

Lambda 函数首先获取队列的长度,以理解可用消息的数量。该值可以在执行期间与队列中剩余项的数量进行对比,以确保所有消息都被提取。或者,您也可以限制每次任务执行中的对象数量。通过将剩余消息留在队列中可能会影响 RPO,或启动多个任务当需要在整体工作中暂停或恢复单个任务时,这可能会起到一定帮助。

pythonimport boto3import os

sqsclient = boto3client(sqs)queueurl = osenviron[QUEUEURL]

获取 SQS 队列长度

resp = sqsclientgetqueueattributes( QueueUrl=queueurl AttributeNames=[ ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible ])approximatequeuelength = resp[Attributes][ApproximateNumberOfMessages]

一旦知道 Amazon SQS 队列中的消息数量,我们将考虑发送到每个 DataSync 任务的消息总数以优化性能。对单个 DataSync 任务的消息过载可能导致延迟,从而影响新消息在 Amazon SQS 队列上的整体处理时间,因为 DataSync 按顺序处理任务执行,而任务在活动的 DataSync 任务完成之前会被排队。

为缓解这一情况,我们对 Amazon SQS 消息进行批量处理以减少 DataSync 任务上的负担。例如,如果在五分钟内将 1000 条消息批量处理,将导致单个 DataSync 任务执行支持每秒 33 新对象。除了选择每次批处理的消息数量外,您还应以固定的节奏如每五分钟清空 Amazon SQS 队列,以确保满足所需的 RPO。

在检索 Amazon SQS 消息的时间,receivemessage API 一次最多可以从 Amazon SQS 队列接收 10 条消息。在以下代码示例中,我们使用 while 循环从队列中检索消息,直到消息计数达到 1000 个,或您定义的其他数量。

pythonQUEUEURL = osenviron[queueurl] MAXRECEIVE = 20000

def processchunks(queuelength int)

processedmsgcount = 0maxreceive = MAXRECEIVEreceivecount = 0# 从 SQS 队列接收并构建消息列表while receivecount lt (maxreceive / 10) and processedmsgcount lt queuelength    receivecount = 1    try        resp = sqsclientreceivemessage(            QueueUrl=QUEUEURL AttributeNames=[All] MaxNumberOfMessages=10        )    except Exception as e        loggingerror(SQS 获取错误)        loggingerror(tracebackformatexc())    try        msgs = []        for message in resp[Messages]            msgsappend(message)            processedmsgcount = 1        yield msgs  # 发送到 DataSync 的消息    except KeyError        return

构建清单

在从 Amazon SQS 队列中检索消息后,我们需要在将这些消息传递给 DataSync 之前进行多个操作。

构建清单文件的步骤:

从每个 Amazon SQS 消息中提取 S3 对象的 “Key” 值。将清单文件写入 Amazon S3。确保没有重复项。确保没有文件夹或前缀注意,清单中只能包括对象和文件。

在将清单文件写入 Amazon S3 时,必须遵循以下规则:

指定完整的对象路径,不能仅指定文件夹/路径/目录。您可以指定特定版本的对象用逗号分隔以进行传输。每个对象之间用换行符分隔。将清单文件保存为 CSV 文件filenamecsv。

所需的 IAM 权限:

super免费加速器下载Lambda:Lambda 函数的执行角色必须包括对包含清单文件的路径桶/文件夹的 s3PutObject 权限。DataSync:DataSync 任务的 AWS 身份和访问管理IAM 角色必须包括对存储清单文件的路径桶/文件夹的 s3GetObject 权限,以及在指定对象版本时的 s3GetObjectVersion 权限。

另一个考虑因素是清单文件的名称。在任务执行期间或任务执行后,清单文件不得被删除或修改,否则无法重跑相同的任务。一种策略是使用 Lambda 调用的 executionID 来命名文件。这会防止重复,并将名称分配给执行 API。此技术需要定期清理清单桶,因为 Lambda 会持续生成新文件。

从每个消息中提取 S3 对象的 “Key” 值:

从 Amazon SQS 队列获取的消息列表现在可以加载到单独的函数中以提取每个消息的 S3 对象 Key 值。该函数处理该列表中的任何目录,并将其从清单文件中排除。

pythondef processchunkdata(messages list)

manifestdata = []processedmsgs = []for msg in messages    msgbody = msg[Body]    payload = jsonloads(msgbody)    msgkey = payload[detail][object][key]    if msgkey[1] != /  # 防止将目录加入清单        manifestdataappend(msgkey)    else        loggerinfo(f发现目录,跳过清单:{msgkey})    processedmsgsextend(        [{Id msg[MessageId] ReceiptHandle msg[ReceiptHandle]}]    )return (manifestdata processedmsgs)

确保我们具有的唯一元素列表,不重复,存储到清单文件中,我们在包含对象密钥的列表变量如 manifestdata上使用 Python set 方法。

将清单文件写入 Amazon S3:

一旦清单文件已加载需传输的对象,便应将文件上传至清单桶。

为了保持文件名唯一,本示例使用了来自 Lambda 调用的 contextawsrequestid。

pythondef publishmanifest(manifestdata set awsrequestid str idx int)

实现 AWS DataSync 与数亿个对象的集成 存储博客

filename = f{S3MANIFESTFOLDER}{awsrequestid}{idx}csvresult = s3resourceObject(S3BUCKETNAME filename)put(    Body=join(manifestdata)[1])if resultget(ResponseMetadata)get(HTTPStatusCode) == 200    loggerinfo(f清单上传:成功:大小:{len(manifestdata)} 文件:{S3BUCKETNAME} {filename})else    sysexit(1)return filename

调用 DataSync

一旦清单文件创建并上传至 Amazon S3,便可执行 DataSync 任务。Lambda 函数调用 StartTaskExecution API,引用清单文件的路径。

pythondsclient = boto3client(datasync)DSTASKARN = osenviron[dstaskarn]

dsclientstarttaskexecution( TaskArn=DSTASKARN OverrideOptions={ VerifyMode ONLYFILESTRANSFERRED TransferMode ALL } ManifestConfig={ Action TRANSFER Format CSV Source { S3 { ManifestObjectPath manifestfile S3BucketArn S3BUCKETMANIFESTARN BucketAccessRoleArn DSS3ROLEARN } } })

从 Amazon SQS 队列中删除成功转移的消息

一旦 DataSync 任务成功运行,Amazon SQS 队列中的消息便可以安全移除。之前的 processchunkdata 函数捕获了在任务执行成功时应删除的消息,这一值被传入以下的 deletemessages 函数。

pythonQUEUEURL = osenviron[queueurl]def deletemessages(processedmsgs list)

totalmsgs = len(processedmsgs)processedmsgbuffer = processedmsgsdeletedmsgcount = 0while len(processedmsgbuffer) gt 0    deletemsglist = processedmsgbuffer[010]    processedmsgbuffer = processedmsgbuffer[10]    try        resp = sqsclientdeletemessagebatch(            QueueUrl=QUEUEURL Entries=deletemsglist        )        deletedmsgcount = len(resp[Successful])    except Exception as e        loggingerror(tracebackformatexc())if deletedmsgcount != totalmsgs    raise RuntimeError(        f删除消息失败:总消息={totalmsgs!r} 响应={resp!r}    )else    loggerinfo(f已从队列中删除 {deletedmsgcount} 条消息)return totalmsgs

使用包含过滤器的大批量迁移

使用 DataSync 迁移大批量数据适合狭义情况下进行初始移植。如果不频繁重复任务,并且您有实际时间限制,离线替代solution可能是 AWS Snowball。DataSync 也适用于结合事件驱动方法稀疏地运行全数据集的验证。

使用 DataSync,您可以使用 StartTaskExecution 以及将 VerifyMode 选项设置为 POINTINTIMECONSISTENT,以检查和验证完整数据集。在这种情况下,由于意图是传输和验证完整数据集,因此不需要使用清单。如果在其他用例中与清单一起使用 VerifyMode,则需要将其设置为 ONLYFILESTRANSFERRED 以此仅针对标识并传输的文件。

对象配额

在使用批量处理方法时,有一些进一步的理论最大值需要讨论,以理解上限配额。

每个任务执行的最大文件或对象数量为 5000 万。每个任务的最大并发执行排队数为 50。每个任务排队的最大对象数为 25 亿,如果排队了最大数量的任务。

首先

全国咨询热线

13594780123

Super加速器(中国)官方网站|Super加速器

联系电话:13594780123

联系人:李总

邮箱:sworn@mac.com

公司地址:邵武市清塑囚牢483号


微信扫一扫

手机官网