Structured Streaming 最初是在 Apache Spark 2.0 中引入的,它已被证明是构建分布式流处理应用程序的最佳平台。SQL/Dataset/DataFrame API 和 Spark 的内置函数的统一使得开发人员可以轻松实现复杂的需求,比如支持流聚合、流-流 Join 和窗口。自从 Structured Streaming 发布以来,社区的开发人员经常要求需要更好的方法来管理他们的流作业,就像我们在 Spark Streaming 中所做的那样。为此,Apache Spark 3.0 为 Structured Streaming 开发了一套全新的 UI。

新的 Structured Streaming UI 通过有用的信息和统计信息提供了一种简单的方法来监控所有流作业,从而使开发调试期间的故障排除变得更容易,在生产环境下通过实时度量更好的理解我们的作业瓶颈。新的 UI 提供了两组统计信息:

流查询作业的聚合信息;流查询的详细统计信息,包括输入速率(Input Rate)、处理速率(Process Rate)、输入行数(Input Rows)、批处理持续时间(Batch Duration,)、操作持续时间等(Operation Duration)。

流查询作业的聚合信息

当开发人员提交一个流 SQL 查询时,这个作业的信息将在 Structured Streaming 选项卡中显示,其中包括活动的流查询和已完成的流查询。流查询的一些基本信息将在结果表中列出,包括查询名称、状态、ID、运行 ID、提交时间、查询持续时间、最后一个批次的 ID 以及聚合信息,如平均输入速率和平均处理速率。流查询的状态有三种:运行(RUNNING),完成(FINISHED)以及失败(FAILED)。所有完成的和失败的查询都在已完成的流查询列表中显示。表格中的错误列(Error)显示失败查询的异常详细信息。具体如下:

我们可以通过单击表格中 Run ID 那列链接查看流查询的详细统计信息。

详细统计信息

统计信息页面显示了包括输入/处理速率、延迟和详细操作持续时间在内的指标,这对于洞察流查询的状态非常有用,使我们能够轻松地调试流作业运行过程中的异常情况。页面如下所示:

上图包含以下的监控信息:

Input Rate: 所有数据源数据流入的聚合之后速度Process Rate: Spark 处理所有数据源的处理速度,也是聚合后的结果。Batch Duration: 每个批次处理时间。Operation Duration: 执行各种操作所花费的时间,以毫秒为单位。

使用新的 UI 进行故障排除

在这一小节,让我们来看看如何使用 Structured Streaming 新的 UI 来进行异常排除。我们的测试代码如下:


import java.util.UUID
val bootstrapServers = ...val topics = ...val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val lines = spark    .readStream    .format("kafka")    .option("kafka.bootstrap.servers", bootstrapServers)    .option("subscribe", topics)    .load()    .selectExpr("CAST(value AS STRING)")    .as[String]
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
val query = wordCounts.writeStream    .outputMode("complete")    .format("console")    .option("checkpointLocation", checkpointLocation)    .start()

由于处理能力不足而导致延迟增加

在第一种情况下,我们运行查询来尽快处理 Apache Kafka 中读取的数据。在每批中,流作业将处理 Kafka 中所有可用的数据。如果我们的资源不足以快速处理当前批次的数据,那么延迟将迅速增加。最直观的判断是输入行和批处理持续时间会呈线性增长。处理速率(Process Rate)提示流作业最多只能处理大约8,000条记录/秒。但是当前的输入速率大约是每秒 20,000 条记录。我们可以为流作业提供更多的执行资源,或者添加足够的分区来处理这些数据。

处理时间比较稳定但延迟很高

这种情况相比第一种情况是处理延迟没有持续增加,具体如下所示:

我们发现在相同的输入速率(Input Rate)下,处理速率(Process Rate)可以保持稳定。这意味着作业的处理能力足以处理输入数据。然而,每批处理的进程持续时间(即延迟)仍然高达20秒。高延迟的主要原因是每个批处理中有太多数据需要处理。通常我们可以通过增加作业的并行性来减少延迟。在为 Spark 任务添加了10个Kafka分区和10个核心之后,我们发现延迟大约为5秒——比20秒要好得多。

使用 Operation Duration 图来进行异常诊断

操作持续时间(Operation Duration)图以毫秒为单位显示执行各种操作所花费的时间。这对于了解每个批次的时间分布并简化故障排除很有用。让我们以Apache Spark 社区中的性能改进 SPARK-30915 为例进行说明。

在 SPARK-30915 工作之前,当压缩后的元数据日志变得很大时,压缩后的下一批处理要比其他批处理花费更多的时间。

经过对代码进行分析之后,发现并修复了不必要的读取压缩日志文件的问题,也就是 SPARK-30915 解决的,下图的运行时间确认了我们预期的效果:



未来工作

通过上面三个案例,新的 Structured Streaming UI 将帮助开发人员通过更加有用的流查询信息来更好地监视流作业。作为早期发布版本,新的 UI 仍在开发中,并将在以后的版本中进行改进,包括但不限于以下功能:

更多流查询执行细节:延迟数据(late data),水印(watermark),状态数据指标(state data metrics)等等。Spark 历史服务器中支持 Structured Streaming UI。针对异常情况的更明显的提示:比如延迟发生等。


©著作权归作者所有:来自51CTO博客作者mob604756e9d3bc的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 来自 Facebook 的 Spark 大作业调优经验
  2. 0401作业
  3. 一文了解 Apache Hive 联邦查询(Query Federation)
  4. Prism:Uber 的 Presto 查询网关服务
  5. js作业一常识类
  6. 战斗民族开源 | ClickHouse万亿数据双中心的设计与实践
  7. 0402作业-作用域与闭包、类与类的继承
  8. Java全栈工程师
  9. MySQL8.0发布,你熟悉又陌生的Hash Join?

随机推荐

  1. Android(安卓)JNI入门第三篇――jni头文
  2. Android软键盘-弹起时布局向上拉-登录界
  3. Error running app: Instant Run require
  4. Android中Activity的4种加载模式
  5. android中的震动
  6. android asset中 zip包解压sdcard
  7. Android(安卓)线程 Handler详解
  8. Android全屏Activity的几种方式
  9. Android getResources的作用和需要注意点
  10. 15个Android很有用的代码片段