经典案例

使用 Amazon Redshift 对存储在 Amazon DocumentDB 中的数据进行近实

2026-01-27 12:44:28
15次

使用 Amazon Redshift 实现近实时分析

关键要点

在本篇文章中,我们学习如何将数据从 Amazon DocumentDB (兼容 MongoDB) 流式传输到 Amazon Redshift,从而解锁近实时分析和洞察。我们将涵盖 Amazon DocumentDB 更改流 和 Amazon Redshift 流式摄取 的使用,结合 AWS Lambda 和 Amazon Kinesis 数据流。我们还提供了一个 AWS CloudFormation 模板,以便于这些服务的轻松部署。

概述

在本篇文章中,我们将讨论一个金融公司如何利用 Amazon DocumentDB 作为客户资料的数据存储。认识到数据分析的重要性,他们现在希望获取有关客户行为、产品性能和收入分析的有价值洞察。为了实现这个目标,他们计划从 Amazon DocumentDB 实现近实时复制到 Amazon Redshift,这样可以使用 Redshift 强大的查询功能进行复杂的数据转换。通过从 Amazon DocumentDB 到 Amazon Redshift 的近实时数据流传输,他们能够基于最新信息进行数据分析,帮助他们及时做出数据驱动的决策。

以下是解决方案的整体架构图。

架构阶段:

启用特定集合的 更改流 的 Amazon DocumentDB 集群。每次新文档被添加到启用更改流的集合时,会触发 Lambda 函数。Lambda 函数将传入的文档写入 Kinesis 数据流。利用流式摄取,数据从数据流传输到 Amazon Redshift,从而实现近实时分析。

这种架构使得数据可以持续地从 Amazon DocumentDB 流向 Amazon Redshift,为近实时分析和洞察提供无缝的数据流。

部署 CloudFormation 栈

使用以下 CloudFormation 模板 部署栈,创建实现和测试解决方案所需的大部分资源。

CloudFormation 部署以下资源:

一个名为 DocDBStreaming 的 AWS Cloud9 环境,使用 t2micro 类型的 Amazon Elastic Compute Cloud 实例,默认操作系统为 Amazon Linux 2。一个名为 transactcluster 的 Amazon DocumentDB 实例集群,版本为 50。一个 Amazon Redshift Serverless 命名空间。在 AWS Secrets Manager 中存储 Amazon DocumentDB 和 Amazon Redshift 集群的用户名称和密码的密钥。一个名为 docdbstream 的 Kinesis 数据流,容量模式设置为按需。一个名为 {AWSStackName}DocumentDBLambdaESM{AWSRegion} 的 Lambda 函数,用于将数据流向 Kinesis 数据流。一个AWS 身份与访问管理 (IAM) 角色 {AWSStackName}DocumentDBLambdaESMRole{AWSRegion},给予 Lambda 函数访问 DocumentDB 集群的权限。

完成栈的部署后,可以继续配置和测试解决方案。

使用 AWS Cloud9 连接到 Amazon DocumentDB 集群

在从 AWS Cloud9 连接到 Amazon DocumentDB 集群之前,我们需要安装所有必需的库。请完成以下步骤:

从 AWS Cloud9 控制台启动 DocDBStreaming 环境。运行以下命令以安装所需的软件包和 Amazon DocumentDB 证书密钥:

bash

设置 mongo 40 库

echo e [mongodborg40] nname=MongoDB Repositorynbaseurl=https//repomongodborg/yum/amazon/201303/mongodborg/40/x8664/ngpgcheck=1 nenabled=1 ngpgkey=https//wwwmongodborg/static/pgp/server40asc sudo tee /etc/yumreposd/mongodborg40repo

安装软件包

sudo yum y updatesudo yum y install mongodborgshell

下载 SSL 文件和加载器

wget https//truststorepkirdsamazonawscom/global/globalbundlepem

现在您可以连接到 Amazon DocumentDB 集群。

访问 Secrets Manager 控制台,获取 DocumentDB 集群的凭证。找到名为 secretDocDBAdminUserlt随机生成字符串gt 的密钥。访问 Amazon DocumentDB 控制台,在导航面板中选择 集群。选择集群 transactcluster。在 连接与安全性 部分,复制 Mongo shell 命令,并插入来自 Secrets Manager 的用户名和密码。

bash

连接到集群

mongo ssl host lt 集群端点gt27017 sslCAFile globalbundlepem username lt 用户名gt password lt 输入密码gt

创建数据库和集合,并启用更改流

创建数据库 findata 和集合 financialData 以捕获交易和客户资料,然后为该集合启用更改流。请使用以下命令:

bash

使用 Amazon Redshift 对存储在 Amazon DocumentDB 中的数据进行近实

创建数据库 findata

use findata

创建集合 financialData

dbcreateCollection(financialData)

为集合启用更改流

dbadminCommand({modifyChangeStreams 1database findatacollection financialDataenable true})

向新集合中插入记录

dbfinancialDatainsertOne({customerid 116 customername Nina Quinn customeremail ninaq@examplecom customeraddress 123 Aspen St transactionamount 85000 transactiontimestamp 20230116 130000 transactiontype Purchase paymentmethod Credit Card transactionstatus Completed})

super加速器官网入口

请注意,更改流可以在不同的层面进行启用,包括集群、数据库和集合层面。但是,对我们而言,我们仅在集合层面启用它们。

为 Lambda 函数配置触发器

创建一个触发器以便在 financialData 集合中发生更改时调用 Lambda 函数:

在 Lambda 控制台,导航到函数 {AWSStackName}DocumentDBLambdaESM{AWSRegion}。选择 配置 选项卡,然后在左侧窗格中选择 触发器。选择 添加触发器。在 触发器配置 中,源选择 DocumentDB。创建事件源映射,配置如下:对于 DocumentDB 集群,选择集群 transactcluster。对于 数据库名称,输入 findata。对于 集合名称,可以留空或输入 financialData。对于 批次大小,输入 100。对于 起始位置,选择 最新。对于 身份验证,选择 BASICAUTH。对于 Secrets Manager 密钥,输入使用 CloudFormation 模板创建的 DocDBSecret。对于 完整文档配置,选择 UpdateLookup。

请注意,事件源映射的流轮询在创建和更新时表现出最终一致性。这意味着在开始或重新开始事件轮询时可能会有延迟,因此设置流的起始位置为 LATEST 可能会导致在这些时间段内错过事件。为防止这种情况,建议使用 TRIMHORIZON 或 ATTIMESTAMP 作为起始位置,以确保捕获所有事件。有关如何配置更改流的更多信息,请参见 使用 Amazon DocumentDB 的更改流。

选择 添加。等待事件源映射的创建可能需要几分钟,并将状态更改为 启用。

接下来,我们在 AWS Cloud9 中向 Amazon DocumentDB 插入一些记录,以测试解决方案并确认我们在 Amazon Redshift 中看到数据。

向 Amazon DocumentDB 插入数据

插入数据到 Amazon DocumentDB:

bashdbfinancialDatainsertOne({customerid 117 customername Oscar Reed customeremail oscarr@examplecom customeraddress 456 Birch St transactionamount 90000 transactiontimestamp 20230117 140000 transactiontype Refund paymentmethod Debit Card transactionstatus Failed})dbfinancialDatainsertOne({customerid 118 customername Paula Stone customeremail paulas@examplecom customeraddress 789 Cedar St transactionamount 95000 transactiontimestamp 20230118 150000 transactiontype Purchase paymentmethod PayPal transactionstatus Pending})dbfinancialDatainsertOne({customerid 119 customername Quincy Tate customeremail quincyt@examplecom customeraddress 321 Douglas St transactionamount 100000 transactiontimestamp 20230119 160000 transactiontype Purchase paymentmethod Credit Card transactionstatus Completed})dbfinancialDatainsertOne({customerid 120 customername Rachel Underwood customeremail rachelu@examplecom customeraddress 654 Elm St transactionamount 105000 transactiontimestamp 20230120 170000 transactiontype Refund paymentmethod Debit Card transactionstatus Completed})dbfinancialDatainsertOne({customerid 121 customername Steve Victor customeremail stevev@examplecom customeraddress 987 Fir St transactionamount 110000 transactiontimestamp 20230121 180000 transactiontype Purchase paymentmethod Credit Card transactionstatus Pending})dbfinancialDatainsertOne({customerid 122 customername Tina Walker customeremail tinaw@examplecom customeraddress 123 Grove St transactionamount 115000 transactiontimestamp 20230122 190000 transactiontype Purchase paymentmethod PayPal transactionstatus Failed})dbfinancialDatainsertOne({customerid 123 customername Uma Xander customeremail umax@examplecom customeraddress 456 Holly St transactionamount 120000 transactiontimestamp 20230123 200000 transactiontype Refund paymentmethod Credit Card transactionstatus Completed})dbfinancialDatainsertOne({customerid 124 customername Victor Young customeremail victory@examplecom customeraddress 789 Ivy St transactionamount 125000 transactiontimestamp 20230124 210000 transactiontype Purchase paymentmethod Debit Card transactionstatus Pending})dbfinancialDatainsertOne({customerid 125 customername Wendy Zane customeremail wendyz@examplecom customeraddress 321 Juniper St transactionamount 130000 transactiontimestamp 20230125 220000 transactiontype Purchase paymentmethod Credit Card transactionstatus Completed})

请注意,当在数据库或集群层启用更改流时,需要适当扩展 Lambda 函数以响应写入吞吐量。这种扩展可能会导致在数据持久化后可用数据在 Amazon Redshift 上的延迟增加。选择合适的批量大小对 Lambda 函数的效率和性能至关重要。此外,了解 Lambda 并发性指可以同时运行的 Lambda 实例数量也很重要。有关 Lambda 扩展和吞吐量的深入探讨,请参考 理解 AWS Lambda 的扩展和吞吐量。

在某些情况下,Lambda 的中断可能会导致重复事件的发生。为了解决这些稀有但重要的情况,正确配置 Lambda 的事件源映射并开发健壮的错误处理机制至关重要。有关如何使 Lambda 函数具备幂等性即关键策略之一的信息,请参阅 如何使我的 Lambda 函数具备幂等性。此外,有关如何配置与 Amazon DocumentDB 一起使用 Lambda 函数的指导,请参见 将 Lambda 与 Amazon DocumentDB 一起使用。

我们已经成功配置了初始阶段,以捕获 Amazon DocumentDB 中的更改并将其流式传输到 Kinesis 数据流。接下来,我们将设置 Amazon Redshift 流式摄取。这一步骤涉及从 Kinesis 数据流中读取数据并将其加载到 Redshift 表中。

设置 Amazon Redshift 流式摄取

在 Amazon Redshift 中设置流式摄取是双步骤过程。您需要首先创建一个外部架构以映射到 Kinesis 数据流,然后创建一个物化视图来从流中提取数据。物化视图必须是可增量维护的。请按照以下步骤设置流式摄取:

在 Amazon Redshift 控制台中,选择导航窗格中的 查询编辑器 v2。

在查询编辑器中,选择左侧窗格中的工作组 Serverlessworkgroupstreamingblog,并切换到使用数据库用户名和密码。

访问 Secrets Manager,使用名为 secretRedshiftAdminUserlt随机生成字符串gt 的密钥中的用户名和密码。

运行以下命令以创建外部架构。这将把 Kinesis 数据流中的数据映射到 Redshift 对象:

sqlCREATE EXTERNAL SCHEMA findataanalysis FROM KINESISIAMROLE default

运行以下命令以创建物化视图。此物化视图会自动刷新,并在数据不断到达流时进行刷新:

sqlCREATE MATERIALIZED VIEW findataextract sortkey(1) AUTO REFRESH YES ASSELECTrefreshtimeapproximatearrivaltimestamppartitionkeyshardidsequencenumberjsonparse(kinesisdata) as payloadFROM findataanalysisdocdbstreamWHERE CANJSONPARSE(kinesisdata)

现在我们已经创建了物化视图,让我们检查已加载的数据。请注意,物化视图的第一次刷新可能会需要一段时间,因此您需要等待约一分钟再检查物化视图:

sqlSELECT payload from findataextract

我们可以在输出中确认正在将数据从 Amazon DocumentDB 集合 financialData 流向 Redshift 物化视图 findataextract。

创建物化视图上的视图以执行分析

为了使 financialData 集合能够实现近实时分析,我们可以在物化视图findataextract之上创建一个视图。此方法允许我们无缝地集成来自多个来源的数据,而无需等待提取、加载和转换 ELT操作。请参见以下代码:

sqlALTER USER awsuser set enablecasesensitiveidentifier =trueset enablecasesensitiveidentifier =trueCREATEOR REPLACE VIEW publicfindataextractview ASSELECTCAST(findataextractpayloadcustomerid AS VARCHAR) AS customeridCAST(findataextractpayloadcustomeremail AS VARCHAR) AS customeremailCAST(findataextractpayloadcustomeraddress AS VARCHAR) AS customeraddressCAST(findataextractpayloadtransactionamount AS VARCHAR) AS transactionamountCAST(findataextractpayloadtransactiontimestamp AS VARCHAR) AS transactiontimestampCAST(findataextractpayloadtransactiontype AS VARCHAR) AS transactiontypeCAST(findataextractpayloadpaymentmethod AS VARCHAR) AS paymentmethodCAST(findataextractpayloadtransactionstatus AS VARCHAR) AS transactionstatusFROMpublicfindataextract AS

全国咨询热线

13594780123

Super加速器(中国)官方网站|Super加速器

联系电话:13594780123

联系人:李总

邮箱:sworn@mac.com

公司地址:邵武市清塑囚牢483号


微信扫一扫

手机官网