为了使练习更加贴近实际业务场景,我们将模拟从应用程序中生成交易订单事件,在这种情况下是与交易流水、交易日期、客户编号、产品编号和一些数据相对应的事件流。在此教程中,将完成以下三个步骤的实验:

• 创建 Amazon Kinesis Data Stream
• 创建 Amazon Kinesis Data Analytics 应用程序
• 创建 Amazon Kinesis Data Firehose 将数据传送到 Amazon S3(Lab2 和 Lab3 需要用到)

上述三个步骤的实验的架构图如下

Kinesis 流数据产生

发送模拟数据

登录准备阶段部署的 EC2,保存如下代码到 ec2-user 的 home 目录下,我使用的 AWS 区域为us-east-1,如果你在其他区域创建的 EC2,请修改代码里面的region_name

https://imgs.wzlinux.com/aws/lab1.sh

然后执行如下代码开始给 Kinesis Data Streams 流平台发送模拟数据(这个2021-03-19在系统内是个交易日期,方便后续作为关键字查找,没有特殊的含义)

[ec2-user@ip-172-31-77-126 ~]$ sh lab1.sh kds-lab1 2021-03-19 &[1] 2768

lab1.sh会往 kds 流里面灌数据,格式为

  • tid: 交易id
  • tno: 交易编号
  • tdate: 交易日期
  • uno: 客户编号
  • pno: 产品编号
  • tnum: 交易数量
  • tuptime: 时间戳

如下仅供参考

{    "tid": "123",    "tno": "AwGi20200904131249",    "tdate": "2021-03-19",    "uno": "U1030",    "pno": "P1002",    "tnum": 10,    "tuptime": "2021-03-19T13:15:48Z"}

系统会生成一个日志文件,查看同一个目录下的日志文件,出现如下字样表示启动成功

准备S3存储桶

因为S3桶是全球唯一的命名,所以为了区分,我们采用如下的方式命名S3存储桶,如下所示

lab-AccountId-wzlinux-com

打开EC2客户端,使用如下命令创建S3桶(也可以直接在控制台创建,此处略)

aws s3 mb s3://lab-921283538843-wzlinux-com/aws s3 ls | grep lab

如下

创建S3终端节点

为了方便内网访问 S3 存储桶,此处我们配置 S3 终端节点。登录并打开 VPC 控制台,往下拉选择左边的终端节点,选中 S3(在搜索框里面输入 S3 并回车即可搜索),选择对应 VPC(此处我们只有一个默认VPC)和路由表:

其他默认,点击“创建终端节点”即可。

Kinesis 流数据分析

本实验演示配置数据流管道(为 Lab2/3 准备),并实时对流数据进行在线分析等。

配置 Kinesis Data Firehose

KFH(Kinesis Data Firehose)是提供实时交付的完全托管服务,可以把流数据发往诸如 Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elasticsearch Service (Amazon ES)、Splunk 以及支持的第三方服务提供商(包括 DatAdog、MongoDB 和 NewRelic)拥有的任何自定义 HTTP 端点或 HTTP 端点。

打开 Kinesis 管理控制台,在左侧菜单栏选择 Delivery streams,在界面上点击“Create Delivery Stream”。

输入传输流的名字(此处为 lab1-kfh),并选择准备过程中创建的 Kinesis 数据流(此处为kds-lab1),然后点击下一步

第二步处理记录选择默认,第三步目标我们选择 S3,并选择之前创建的存储桶(此处为:lab-921283538843-wzlinux-com

然后在 S3 前缀和后缀分配输入名字lab1-input/lab1-error/,如下

配置缓冲的时候,设置为“1”M和“60”秒,其他默认,点击下一步,然后审核并点击“Create delivery stream”

大概过 1 分钟左右,就可以在 S3 存储桶里面看到对应的数据输出。

配置 Kinesis Data Analytics

打开 Kinesis 管理控制台,在左侧菜单栏选择“Data Analytics”,在界面上点击“Create application”,然后输入名字(此处为kas-lab1)和运行时(SQL)即可

创建成功后,选择连接数据流(连接到之前准备阶段创建的 kds 数据流,此处为kds-lab1,已经通过脚本在送数据了)

然后点击左下角的发现架构,开始获取元数据

发现后的架构和数据格式跟我们预期的一致,所以此处不做更改,直接保存即可

接下来我们选择用 SQL 做实时分析

然后使用如下 SQL 代码,保存并运行(我们此处演示按 1 分钟聚合,如果有其他聚合时间要求,可以修改最下面的60 这个数字即可)

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (count_tno integer,sum_tnum integer);CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"SELECT STREAM count(*), sum("tnum")FROM "SOURCE_SQL_STREAM_001"GROUP BY FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 60 TO SECOND);

略微等待一小段时间,我们即可看到运行结果,如下所示(和我们的代码预期一致,一分钟一次聚合)

可以在目标页面把查询结果输出到别的地方,例如别的流,别的 S3 存储桶等用于业务用途,此处不做演示。

至此,关于流数据处理的动手实验(Lab1)已经完成。

欢迎大家扫码关注,获取更多信息

©著作权归作者所有:来自51CTO博客作者wzlinux的原创作品,如需转载,请注明出处,否则将追究法律责任

喜欢我的文章,成为我的天使投资人吧

赞赏

0人进行了赞赏支持

更多相关文章

  1. Linux运维必会的100道MySql面试题之(一)
  2. Mongodb数据库基础入门(一)
  3. Linux运维必会的100道MySql面试题之(三)
  4. Python开发中字典和json有什么区别?
  5. mysql数据库常见报错及解决方法
  6. 增强分析对BI商业智能有什么作用及影响?
  7. python常用的数据库有哪些?五大类!
  8. K8S部署API网关Kong
  9. 强大的开源企业级数据库监控利器Lepus

随机推荐

  1. Linux: xclip,pbcopy,xsel用法 terminal
  2. Linux 性能查看命令:
  3. 熟悉Linux系统的操作
  4. Ubuntu安装软件提示boot空间不足
  5. Linux2.6.6内核下ACPI PCI Hot-Plug的实
  6. 一些常用的Linux命令
  7. jmap使用以及 linux下查看进程的内存使用
  8. linux中的磁盘分区
  9. Linux学习总结(十五)文件查找 which wherei
  10. Linux学习笔记_1.Linux常见指令