Spark PMC 亲临 Kyligence ,现场解读 Spark 生态圈最新动向

2019年 11月 06日

近日,Databricks 融资四个亿估值 62 亿美金的新闻引爆了整个技术圈。Spark 历经 10 年发展,已经成为当今最炙手可热的开源技术框架之一。熟悉我司的朋友都知道,我们的最新产品已经实现了 all On Spark,不管是构建引擎还是查询引擎,所有的管理全都基于 Spark 运作。全栈 Spark 架构不仅给构建和查询带来更好的性能,提升服务的时间响应的及时性,也能为企业客户减少采购成本和降低运维成本。

上个月 18 日,我们非常荣幸邀请到了 Spark PMC 李潇来到 Kyligence 交流分享,在此也特别感谢大神不辞辛劳来我司交流分享。

李潇:Databricks Engineering Manager,Apache Spark 项目 Committer & PMC,管理两组跨国团队,专注于 Apache Spark 和 Databricks Runtime 的开发和建设,同时也积极推进 Spark 开源社区的建设。(Github: gatorsmile)


Apache Spark & Databricks

Apache Spark 自 2009 年诞生与加州大学伯克利分校的 AMPLab 实验室,历经 10 年发展,全球超过 1400 位来自 300 多家企业和组织的工程师为其贡献代码,是大数据处理领域事实上的业界标杆。Databricks 作为 Spark 背后的公司,大名鼎鼎的“砖厂”,可谓阵容豪华,包括了 UC Berkeley 计算机教授、AMPLab 联合创始人 Ion Stoica,UC Berkeley 计算机科学教授 Scott Shenker(Scott计算机历史上论文被引用次数最高的人,同时也是知名SDN公司Nicira的联合创始人及前CEO),Spark 原作者、Stanford 教授 Matei Zaharia。在不断完善 Spark 的同时,还陆续推出了 Mlflow、Delta 等重磅产品。

本次分享着重介绍了 Spark 生态圈的最新动向。Delta Lake 这个最新的Spark Data Source 是如何解决 Spark 的各种痛点;以及 Spark 3.0 预览版的众多特性。特别是,深入讲解了 Dynamic Partition Pruning 和 Adaptive Query Execution 优化是如何让 Spark 更加容易使用并且快速执行。

Topic 1: Delta

传统 Lambda 架构系统局限性太大,为了解决如下问题:

  • 同时读写,并且要保证数据的一致性
  • 可以高吞吐从大表读数据
  • 遇到错误写出可以回滚和删改
  • 在线业务不下线的同时可以重新处理历史数据
  • 处理迟到数据而无需推迟下阶段的数据处理

Databricks推出了全新的Delta架构(Structured Streaming + Delta Lake):

Delta架构的特性:

  • 持续的数据流入和处理 [再无 Lambda 架构的批流分离]
  • 物化中间结果来改善可靠性和方便故障排查
  • 基于用户使用场景和商业需求做费用与延迟的取舍
  • 根据查询的常用模式来优化数据的物理存储
  • 历史数据的再处理只需要删除结果 table 重启流处理
  • 通 过 调 整 schema management,data expectations and UPDATE/DELETE/MERGE 来一步一步地改善数据 质量,直到数据可以被用于分析

Delta架构的优势:

  • 减少端到端的 pipeline SLA,多个使用单位把 data pipeline 的 SLA 从小时减少到分钟级别
  • 减少 pipeline 的维护成本,避免了为了达到分钟级别的用例延迟而引入 lambda 架构
  • 更容易地处理数据更新和删除,简化了 change data capture, GDPR, Sessionization, 数据去冗
  • 通过计算和存储的分离和可弹缩从而降低了 infrastructure 的费用,多个使用单位将 infrastructure 的费用降低了超过十倍
Topic 2: Spark 3.0 

Spark 3.0 预览版引入了包括 Dynamic Partition Pruning,Adaptive Query Execution 等众多新特性,而这些都离不开整个社区以及 Spark 背后的大厂们的共同努力与贡献。

Dynamic Partition Pruning:Spark 2.x 查询引擎的 CBO 的实际效果并不理想,主要是因为以下 3 个问题,

  • Missing Statistics (一次性计算 ETL workloads)
  • Out-of-date Statistics (存储与计算分离)
  • Misestimated Costs (多样部署环境 + UDFs)

对于 Query Optimization,在 Rule 和 Cost 的基础上,Spark 3.0 中新增了 Runtime 级别的 Dynamic Partition Pruning,以此下图查询为例, left table 上的 column filter 无法作用于 right table,因此 right table 还是需要 scan 大量数据,而事实上 join 的计算过程中只用到了 right table 的少量数据。如何去避免这种情况呢?答案就是 Dynamic Partition Pruning,通过在运行时将 left table pushdown 之后的结果 reuse,为 right table 产生一个新的 filter,由此大幅减少 table scan,带来性能上的巨大提升。

Adaptive Query Execution:通过动态的 Statistics 来做优化,在执行过程中,产生执行的部分 RDD 时,可以把这部分的 Statistics 结果收集起来,为后续计算重新做优化,产生新的更优的执行计划。以下图查询为例,根据 Runtime Statistics,将 Sort Merge Join 优化成了 BroadCast Hash Join,由此大大提升了性能。但是 Adaptive Query Execution 默认是关闭的,因为对于小查询这个优化可能会带来额外的消耗,以致性能反而变差。

Q & A

Q: Delta Lake 中乐观并发控制如何判断 who win when conflicts among transactions?

A: 这些判断会依赖文件系统自身的原子性,在不同的环境中有不同的 commit service,比如 HDFS 本身就支持文件操作的原子性,所以对应的 commit service 可以通过直接调用 HDFS 的 api 的方式来进行操作。

Q: Delta Lake 主要解决了读多写少的场景,我们曾针对 Kyligence Enterprise 的用户做过一系列调研,结果显示,存在不少需要一个写比较密集,而读也需要很高的性能的方案的场景,对于这种场景你们是怎么看待的?

A: Delta Lake 的确更适合少写多读的场景,我们也是做了取舍,具体情况还是要针对性地去做读或者写的优化。比如,将来我们可以稍微牺牲部分读性能的情况下去做写的优化时,就可以将写产生的 change 保存在额外的 file 中,这样就是写得更快,但是每次需要多读一个 file,并定时地合并。

Q:修改历史版本数据,Delta Lake 能否支持?更新数据的成本如何?(Kyligence Enterprise 的用户偶尔会有这种需求:对于历史上已经计算完成的报表数据,用户偶然会发现历史数据中有少量错误数据,这导致计算出的 cube 数据也有偏差,此时用户修改了原始数据后,就想对 cube 数据进行更正。Kyligence Enterprise 中想要完成数据修改,需要具备2个条件:1,历史数据存在;2,能确认错误数据所属时间段,由此可以对数据进行 refresh。但是计算量取决于包含错误数据的 cube segment 数据量的大小)

A:Delta Lake 中支持 Update,但是影响到的数据范围会由业务来判断,可能影响很大,取决于影响到的 file 数和 file的大小。

Q: Spark 是如何让业务知道自己是否适合这个平台,或者说如何让用户能轻松地意识到这个查询是否适合?

A: 我们 Databricks 会自动地提供 Tips(提示),目前,对于有些业务,的确存在性能达不到或者不合适的情况,但是理论上都是可以做到的,不过有些需要新实现订制的版本来支持;在我们的平台上,当用户的任务运行完成之后,平台自动能给出一些建议(Tips),比如通过开关某个 feature 或者修改部分配置来提升性能。

Q: Spark 中 CBO 效果不好,其中一个原因是 Statistics 的缺失,那很多 ETL 或者数据分析,用户的数据都是按天增量的,这些增量数据的 Statistics 是否能从历史数据中获取?

A: 所有对于具体 workload 或者具体使用环境做出的优化都是能够提升 Spark 的表现,但是 Spark 作为一个普适的查询引擎,很难以一个 general 的方式去解决所有的问题。反过来说,Delta 是另一个外部数据源(date source),因为它是拥有数据的,它有大量的空间可以根据 Statistics 去做优化,对于外部数据源,Delta 需要自己的定制优化去改善查询。是否拥有数据这里是有本质区别的。

Q: Kyligence 目前在根据查询 sql 的执行计划树和一些 Statistics 来预测 Spark job 的最佳配置,由此来降低 Kyligence Enterprise 用户的使用和调优成本,社区是否重视这个?

A: 我们认为这个是很重要的,但是目前很难设计一套普适的方案去解决所有的配置调优问题,根据用户场景的不同,他们会有自己的方案,而且这些调优通常在 Spark 外部的系统(使用 Spark 的系统)去解决。目前 Spark 用户也相对资深,Spark 提供各种接口,用户可以插入定制的调优,像 Linkedin,Facebook, Netflix 都在做类似的事情。

Q: Spark 中 parquet 没有 IO 级别的 index,这是为什么?

A: Spark 作为一个计算引擎,是不拥有数据的,在没有数据特征的情况下去做 index 并没有太大的意义,所以 Spark 不做这些。Delta 里面是做了的,因为 Delta 是拥有数据的。

Q: 使用 Spark 时,如果只对少量表做 Broadcast,的确能提升性能,但是如果同时对多张表进行 Broadcast 会引发 driver 的内存问题,或者 Shuffle statistics 很大很多的情况下,driver 端也会有内存问题,如何解决这种问题?

A: 我们下个 release Spark 3.0 将支持所有 join algorithm 的 Hint。用户可以更方便地选择最适合的算法,来提升自己的查询。driver 端的全局内存管理是需要重构,现在做的各种优化和改善都还没有解决本质问题。 

Q: Structured Streaming 持续数据流的 limitation 解决地如何?

A: 由于超低延时的持续数据流的需求不是那么大,目前流处理团队在解决流处理的最痛的痛点,比如大量小文件和读写原子性,因此主要投入在 Delta 的研发,持续数据流暂时没有投入过多的资源。