Kylin on Parquet 介绍和快速上手

王汝鹏
Kyligence 大数据研发工程师
2020年 4月 27日

Apache Kylin on Apache HBase 方案经过长时间的发展已经比较成熟,但是存在着一定的局限性。Kylin 查询节点当前主要的计算是在单机节点完成的,存在单点问题。而且由于 HBase 非真正列存的问题,Cuboids 信息需要压缩编码,读取 HBase 数据的时候再反序列化、分割,额外增加了计算压力。另外,HBase 运维难度比较大,不便于上云。面对以上问题,Kyligence 推出了 Kylin on Parquet 方案。下文中,Kyligence 的大数据研发工程师王汝鹏讲解了 Kylin on Parquet 解决方案的架构、原理以及如何开发调试代码。

本文主要包括以下几方面的内容:首先会给大家介绍架构设计,然后说明一下我们为什么会去做 Kylin on Parquet,接下来会介绍一下全新的构建和查询引擎以及相比较于 Kylin 3.0 的性能表现,最后有一个现场演示 Demo,给大家介绍一下产品的使用和代码调试方法。

01

架构

Apache Kylin 很早就被设计成了可插拔的架构,基于这种架构我们就可以很方便的去替换某个模块而不会影响其他模块。

Architecture - Apache Kylin

Kylin on Parquet 也是在 Kylin 原来架构的基础上实现了新的查询、构建引擎和存储模块。通过 Spark 实现的查询引擎,能够提交计算任务到 Yarn 上,实现分布式的处理。

Architecture - Kylin on Parquet

Cube 构建这边也是完全通过 Spark 进行处理,不再支持 MapReduce 构建。

数据源现在支持 Hive 和本地 CSV 数据源,目前可以摆脱沙箱的限制,通过本地的 CSV 数据源搭建一个调试环境。

存储层去掉了 HBase,最终构建完成的 Cube 数据都是通过 Parquet 的形式直接存储在文件系统中。

02

为什么是 Kylin on Parquet?

首先,原来 Kylin 依赖 HBase 的架构在查询的时候会存在单点问题,因为一次查询任务在通过 Coprocessor 获取到数据之后的处理是在查询结点单机上完成的。

HBase 不是一个真正的列式存储,它通过 RowKey 来保留每一行的数据,之所以称之为“列式”,是因为它通过列族的结构管理列数据,何为真正列式存储,可以通过下面文章了解更多:https://en.wikipedia.org/wiki/Column-oriented_DBMS。

我们可以看到下面Cube逻辑视图中,Kylin 3.0 及以前对于 Cube 是通过将所有的维度和度量分别压缩成一列进行存储的,这样在查询的时候还需要对这一列进行反序列化、分割等操作,额外增加了计算压力。

Kylin on HBase - Limitations

最后,HBase 比较难于维护,运维难度比较高。

查询过程主要就是 Calcite 会将 SQL 解析成一棵物理执行计划树,其中的计算逻辑的代码都是通过 Calcite 生成的,这些代码会比较难于调试和定位问题。

Kylin on Parquet 目前能够通过 Spark 进行分布式的查询,我们对 Calcite 生成的执行计划做了一层转换,转换成了 Spark 的执行计划,其中每一层的处理的数据我们都是能够通过添加断点查看的。

现在查询相关的逻辑代码也是比较方便调试的,比如我们怀疑在聚合(Agg)这一层出了问题,我们就可以在 Agg 这一步添加断点,查看一下数据是不是符合我们的期望。

存储这边我们替换成了 Parquet,所有的维度和度量会按照每一列进行存储,后面对于存储的结构也会有更加详细的介绍。

03

Cube 构建与查询

 1. 构建引擎

接下来给大家介绍一下全新的构建引擎以及其中的功能是怎么实现的。

1)关键特性

Cube Build- Key Features

以下是关键的特性:

  • 构建引擎完全的通过 Spark 进行处理,中间的所有流程都能够在 SparkUI 上监控到。如果构建过程出现了问题,也能够在 SparkUI 上查看任务的执行情况。
  • 构建引擎加入了自动调参的功能,这个主要是针对用户没有手动去配置 Spark 参数的情况下,根据构建任务量的情况去调整 Spark 相关的参数,这样能更高效地去执行任务。
  • 构建引擎实现了全局字典的分布式构建。
  • 加入了自动恢复失败任务的功能,当任务失败之后,构建引擎会分析当前任务失败的原因, 然后根据不同失败的情况执行不同处理的策略。

2)接口设计

Cube Build - Interfaces

分享的开头里,我提到了 Kylin 可插拔式的架构设计,所以上层实现的接口从 AbstractExecutable 到 CubingJob 都是 Kylin 原有的接口,通过调用 SparkCubingJob 的 create 方法可以提交一个构建 Segment 的任务,然后接下来我们抽象出来了两个步骤,一是资源探测,二是构建 Cube。这两步后面也会进行更加详细的介绍。最后,这两步会串联起来通过 Spark 任务的方式提交到集群或者本地去执行。

3)步骤

构建步骤包括资源探测和 Cube 构建。资源探测主要做了三件事,首先它会去估算一下当前数据源表的大小,这里也是为了接下来第二步自动调参准备的,第三点是构建全局字典。

Cube 构建这一步其实和原来的构建引擎整体步骤是差不多的,首先会通过 Spark 创建平表,然后逐层地构建 Cube,接下来通过 Parquet 的形式进行存储,最后再更新一下 Metadata。为什么我们会把这么多处理集合成一个步骤,主要是因为数据主要是通过 Spark 在内存中进行处理,如果再拆分成多步,还需要对中间数据进行持久化等操作,这样处理效率就会打折扣。右图是构建任务在前端的执行情况。

4)自动调参

Adaptively adjust Spark parameters

自动调参功能默认是打开的,并且只在集群模式下生效,而且手动配置的优先级要高于自动调整。它会根据数据源的大小等情况,估算一下当前构建任务需要的计算资源,最终调整 Spark 任务中 executor 相关的参数。

5)全局字典

Cube Build - Global Dictionary

全局字典功能相对于 Kylin 3.0 主要有两点提升:能够分布式地处理;不再局限于整数类型最大值的限制。其实当前 Kylin 3.0 是新加入了分布式构建字典的功能的,不过默认还是单机构建的方式。

Cube Build - Global Dictionary

具体步骤如下:

  • 通过 Spark 创建平表和获取对应列的 distinct 值
  • 将数据分配到多个桶中
  • 对每一个桶内的数据进行编码
  • 保存字典文件和 metadata 数据(桶数量和桶的 offset 值)

第一次构建字典的时候会对每个桶内的值从 1 开始编码,在编码完成后再根据每个桶的 offset 值进行一次整体字典值的分配。

Cube Build - Global Dictionary
Cube Build - Global Dictionary

第二次提交 Segment 构建任务的时候,会对每个桶的值进行一次再分配,相对于桶内已有值进行编码,然后根据新的 offset 去更新每个桶内相对于全局的一个字典值。

Cube Build - Global Dictionary
Cube Build - Global Dictionary

磁盘上保存的目录结构如图所示。

Global Dictionary - Storage

6)自动重试

自动重试功能会分析导致构建任务失败的异常或错误,并分别采取不同的处理策略。

  • 当遇到 OutOfMemoryError 的时候,引擎会检查当前 Spark 任务是否开启了 AUTO_BROADCASTJOIN_THRESHOLD 这个参数,这个功能比较容易导致Spark任务出现内存不足的报错,尝试禁用这个功能,然后重新提交构建任务。
  • 如果遇到的是 ClassNotFoundException,构建引擎会直接终止当前任务并抛出异常。
  • 对于其他异常,构建引擎会尝试调整 executor core 的数量和分配内存大小,然后重新提交任务。

此功能的默认重试次数为三次,而且是默认打开的,如果想禁用此功能,可以将 kylin.engine.max-retry-time 设置为 0 或者如任意负数。

7)度量

Cube Build - Measures

构建过程对所有的度量都是会做处理的,具体处理逻辑可以在 CuboidAggregator.scala 文件中查看。由于现在查询引擎还存在一些兼容性的问题,TopN, CountDistinct, Percentile 现在还查不了,但是已经有 issue 在做了。

8)存储

假设我们最终生成的 cuboid 内容如上图所示,存在三个维度和两个度量,对应的 parquet 文件的 schema 就是中间这张图的样子。我们会将所维度名称映射成一个唯一的数字,这样也是为了进一步优化存储。我们可以将 parquet 文件下载到本地,通过 spark 看到当前 parquet 文件,也就是我们保存的 cuboid 文件的 schema 内容。

Cube Build - Storage

磁盘上存储的目录结构如上图所示,所有文件是通过项目来归类的,包括字典,构建产生的临时文件以及构建完成的所有 cuboids。Segment 目录会有一个独立的签名,防止出现写入冲突等问题。

9)性能对比

Cube Build - Performance

我们将新的构建引擎和 Kylin 3.0 的构建引擎(MapReduce)做了一下对比,运行环境是拥有四个计算节点,Yarn 拥有 400G 内存和 128 内核的集群。Spark使用的内部版本,由于我们对 Spark 源码做了一些优化,所以目前并不支持社区版 Spark。测试的数据集是标准的 SSB 数据集。

Cube Build - Performance

左边是最终占用存储空间的大小,新构建引擎存储空间占用能够减少一半。右边是构建时间的对比,也能够看到新构建引擎也比  Kylin 3.0 快了许多。

2. 查询引擎

1)步骤

Query - Steps

一次查询的请求发出后,Calcite 会分析 SQL 并解析成抽象语法树(AST),然后对 AST 进行校验、优化等操作后,再转换成执行计划树(RelNodes)。新查询引擎会将所有的 RelNodes 转换成 Spark 执行计划。最后再通过 Spark 去执行所有的查询任务。

查询引擎会把每一个计算逻辑转换成对应的 Spark 逻辑。转换的这一步其实也做了不少工作,因为 Calcite 有自己的类型,Spark 也有自己的类型,我们需要对其进行处理。Calcite 的一些函数操作也需要做一些对应的实现。

开始的时候也说过了,我们可以在每一个 DataFrame 中添加断点去进行调试,查询中间处理的值,这样能够更加方便的排查问题。查询引擎会在第一次收到查询请求的时候在 Yarn 上创建一个常驻进程,专门用来处理查询任务。

Query - Dependency Isolation

针对查询引擎还做了依赖隔离的处理,主要防止外部依赖类冲突的问题。

2)性能对比

Query - Performance

查询引擎的性能表现也是和 Kylin 3.0 做了一下对比,测试环境和构建性能测试环境是一样的,这里就不赘述了。我们对 SSB 数据集和 TPCH 数据集都做了对比。    

Query - Performance

SSB 数据集规模大概有六千万行,不过 SSB 的标准 SQL 大都比较简单,所有我们看到查询基本上都是一秒内完成的。

Query - Performance

TPCH 数据集规模大概有一千两百万行,TPCH 的标准 SQL 要求更高一些,我们可以看到 Kylin3.0 耗时非常长的查询任务,新的构建引擎的查询能够快很多,因为我们对复杂的查询做了一些优化。

04

Demo

请点击播放下方现场回顾视频,拖动进度条至 26:35 的位置,即可开始观看。

05

规划

TODO List

06

如何体验与贡献

最后也欢迎大家加入我们,目前 Kylin on Parquet 也已经开源出来,对应的文档在 Github 仓库的 wiki 页面也都能看到。大家有问题也可以去 JIRA 上提出来,我们后期会进行修复。最后为了方便大家讨论也可以加一下上图的微信群。