【技术帖】如何优化Kylin的Cube构建性能

史少锋
2017年 2月 14日

Kylin将Cube的构建分成若干子任务,然后由任务引擎依次执行。这些子任务包括Hive操作,MapReduce任务,以及HBase操作等 。通常一次构建耗时几分钟到几小时,取决于数据量、模型复杂度、集群计算能力、配置等多方面因素。当用户有很多个Cube需要频繁构建时,了解Cube构建并有针对地进行优化就显得尤为必要。


作者简介

史少锋,Kyligence技术合伙人&资深架构师,Apache Kylin核心开发者和项目管理委员会成员(PMC),专注于大数据分析和云计算技术。曾任eBay全球分析基础架构部大数据高级工程师,IBM云计算部门软件架构师。


Apache Kylin介绍

Apache Kylin是一个开源的分布式查询引擎,在Apache Hadoop之上提供标准SQL的查询接口, 支持对TB到PB的数据量级以秒级别的低延迟进行查询,从而使得大数据的交互式分析变成可能。Kylin于2014年开源并加入Apache孵化器,2015年毕业成为Apache独立顶级项目,迄今在诸多行业和公司得到广泛使用。此外Kylin也是首个完全由中国工程师开发并贡献到Apache基金会的项目。


小贴士

如果您还不了解Kylin的话,请访问项目主页https://kylin.apache.org 。

如何优化

图1. Apache Kylin架构图

Kylin与其它大数据查询引擎的主要区别在于,Kylin对数据的计算分为预计算和后计算两个阶段:预计算根据用户建立的模型,通过一系列离线任务将原始数据加工成多维立方体(Cube);后计算将用户的SQL查询转换成对Cube的即时查询, 再处理后得到最终的结果。因为很大一部分计算工作已经在预计算阶段完成,后计算往往能在亚秒级完成,从而对用户的查询请求可以很快做出响应,并且当数据量成倍增长的时候,依然可以保持近乎0(1)的超低时间复杂度。

Kylin将Cube的构建分成若干子任务,然后由任务引擎依次执行。这些子任务包括Hive操作,MapReduce任务,以及HBase操作等 。通常一次构建耗时几分钟到几小时,取决于数据量、模型复杂度、集群计算能力、配置等多方面因素。当用户有很多个Cube需要频繁构建时,了解Cube构建并有针对地进行优化就显得尤为必要。本文结合Apache Kylin 1.6.0,按子任务的执行次序,介绍Kylin中Cube构建(也即预计算 )的原理以及可优化的手段。

Step 1. Create Intermediate Hive Table (创建临时的 Hive 平表)

作为构建的第一步,Kylin从Apache Hive中将源数据抽取出来并插入到一张临时创建的表中,后续的处理将以这张表为输入,待Cube构建完成后,再将其删除。抽取时只选择Cube模型中用到的列,如果模型是按某个日期/时间列做分区的,还会将时间条件应用在创建平表的Hive命令中。下面是一个创建平表的示例语句:

hive -e “USE default;
DROP TABLE IF EXISTS kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34;

CREATE EXTERNAL TABLE IF NOT EXISTS kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34
(AIRLINE_FLIGHTDATE date, AIRLINE_YEAR int, AIRLINE_QUARTER int, …, AIRLINE_ARRDELAYMINUTES int)
STORED AS SEQUENCEFILE
LOCATION ‘hdfs:///kylin/kylin200instance/kylin-0a8d71e8-df77-495f-b501-03c06f785b6c/kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34’;
SET dfs.replication=2;
SET hive.exec.compress.output=true;
SET hive.auto.convert.join.noconditionaltask=true;
SET hive.auto.convert.join.noconditionaltask.size=100000000;
SET mapreduce.job.split.metainfo.maxsize=-1;

INSERT OVERWRITE TABLE kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34 SELECT
AIRLINE.FLIGHTDATE, AIRLINE.YEAR, AIRLINE.QUARTER,…, AIRLINE.ARRDELAYMINUTES
FROM AIRLINE.AIRLINE as AIRLINE
WHERE (AIRLINE.FLIGHTDATE >= ‘1987-10-01’ AND AIRLINE.FLIGHTDATE < ‘2017-01-01’);”

在执行INSERT INTO 的Hive语句时,Kylin会应用出现在conf/kylin_hive_conf.xml中的配置参数和值,例如使用较少的HDFS复本(dfs.replication=2)、启用Hive的mapper内连接(hive.auto.convert.join.noconditionaltask=true)等。如果您的环境需要其它有益于Hive执行的参数,添加在这个配置文件中即可对所有Cube生效;如果您想为不同Cube设置不同的Hive参数值,可以在创建Cube的“参数覆盖”页,使用前缀kylin.hive.config.override.进行配置。

图二:在Cube上覆盖Hive配置参数

在此例中,Cube是按FLIGHTDATE 列分区的,因而构建时Kylin会将用户选择的时间范围作为where过滤条件,让Hive抽取只在此时间范围内的数据。如果此时间列,正好也是Hive表的分区列的话,Hive可以智能地利用此条件只扫描对应分区内的文件,从而减少读取文件的数量。因此,当您的源数据表是按日期增量导入时,建议对Hive表按日期列做分区,并且在Kylin的数据模型中,使用相同的列做为Cube的分区列。

此外,如果Hive中默认开启了自动合并小文件的话,可以在conf/kylin_hive_conf.xml添加以下配置项以关闭自动合并,因为Kylin会有自己的方式来合并文件(下一步会介绍):

<property>
<name>hive.merge.mapfiles</name>
<value>false</value>
<description>Disable Hive’s auto merge</description>
</property>

Step 2. Redistribute Intermediate Table(重分布数据)

在第一步完成后,Hive将抽取出的数据生成在指定的HDFS目录下;仔细观察发现,有的文件比较大,有的文件比较小甚至是空的。文件大小的不均衡,会导致后续的MR任务也不均衡:有的Map分到的文件块较大所以耗时较长,而有的分到的很小所以耗时较短,从而形成“木桶效应”。为消除这种不均衡,Kylin需要对临时表的数据做一次重新分布,希望借助这次重分布使得文件块的大小适中且基本一样大。下面是一次重分布的例子:

total input rows = 159869711
expected input rows per mapper = 1000000
num reducers for RedistributeFlatHiveTableStep = 160

Redistribute table, cmd:

hive -e “USE default;
SET dfs.replication=2;
SET hive.exec.compress.output=true;
SET hive.auto.convert.join.noconditionaltask=true;
SET hive.auto.convert.join.noconditionaltask.size=100000000;
SET mapreduce.job.split.metainfo.maxsize=-1;
set mapreduce.job.reduces=160;
set hive.merge.mapredfiles=false;
INSERT OVERWRITE TABLE kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34 SELECT * FROM kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34 DISTRIBUTE BY RAND();


Kylin通过查询Hive获得临时表的行数,基于此行数计算出需要重分布的文件块数量。默认地Kylin会为每100万行数据分配一个文件。在此例中,临时表有接近1.6亿行数据,所以预计需要160个文件。通过在执行INSERT INTO … DISTRIBUTE BY 的重分布语句前设置“mapreduce.job.reduces=N”的方式,使用指定数量的reducer来接收数据;按照每个reducer写一个文件的惯例,从而得到指定数量的文件。在后续以此临时表为输入的MR处理中,Hadoop会启动与文件数相同的Map(通常100万行数据的大小不超过一个HDFS文件块)。通过这一步,既合并了小文件,也消除了木桶效应。

每个文件100万行的配置是一个经验值;如果您的数据规模较小,或Hadoop集群较大,可以分割更多的文件以获得更高的并发性:在conf/kylin.properties中设置特定参数,如:

kylin.job.mapreduce.mapper.input.rows=500000
多数情况下,Kylin会让Hive随机地重分布数据;这时的重分布语句是 “DISTRIBUTE BY RAND()”。因为是随机,它会让数据分布的非常均匀,但是毫无规律。

如果您的Cube指定了一个“shard by”列的话,Kylin会让Hive按此列的值做重分布,而不再是随机分布。“Shard by”列的选择是在Cube的高级设置页,如图二所示,此列需要是一个高基数列如“USER_ID”)此时的重分布语句会变成:”DISTRIBUTE BY USER_ID”。

图三:选择Shard by列

按高基数维度列做重分布的好处是,具有相同此列值的行,将被归拢在同一个文件,从而非常利于后续的聚合计算:大量的聚合在本地完成,减少了数据的交叉传递。在一个典型场景中,这一优化使得Cube构建的时间减少了40%。

请注意
1) “shard by”列必须是一个高基数列,并且它会出现在很多cuboid中(而不是仅仅出现在少数cuboid中);
2)此列在所有时间区间均有很多值可供均匀分布;典型的例子有: “USER_ID”, “SELLER_ID”, “PRODUCT”, “CELL_NUMBER”等,基数至少在数千以上。
3) 仅支持选择一个列做为”shard by”。
4) 使用”shard by” 对于Cube的存储和查询也有益处,这里不做讨论。

Step 3. Extract Fact Table Distinct Columns (抽取各维度的不同值)
在此步骤中,Kylin启动一个MR任务来获取各个维度在临时表中出现的不同值,用于构建维度字典(dictionary)。

实际上此步骤做了更多事情:它模拟Cube的构建并使用HyperLogLog计数器来估算每个cuboid的行数,从而收集Cube的统计数据。这些统计数据将在后续的任务中辅助Kylin 采取不同的策略和配置,而且还可以帮助用户后续分析等。
在这一步中如果您发现Map任务非常慢,通常它说明Cube的设计过于复杂,如有太多维度、太多组合等,请参考 [优化Cube设计] (https://kylin.apache.org/docs16/howto/howto_optimize_cubes.html) 来为Cube瘦身。如果Reduce任务总是报OutOfMemory 错误,说明Cube组合数过多,或者默认的YARN内存分配过少。如果通过调整集群设置等方法依然不能在合理时间内完成此步骤,那么您可以及早停止并深入调查问题,因为后续的构建将比这一步更加耗时。

当然您还可以通过修改配置来降低此步的数据采样率,从而使得效率提升,但并不推荐这样做因为它会降低统计的准确性.

Step 4. Build Dimension Dictionary(构建维度字典)

在上一步完成后,每个采用字典编码的维度,在此临时表中出现的不同值将被抽取在单个文件中。在此步中,Kylin会以读取这些值并在任务引擎节点的内存中构建字典(从下一版开始Kylin会将此任务移至MR)。通常此步骤会在数秒中完成;但是如果有超高基数(大于一百万)维度­,Kylin可能会报”Too high cardinality is not suitable for dictionary”的信息,建议用户对超高基数维度采用非字典编码,如”fixed_length”, “integer”等。对编码的设置也是在Cube的“高级设置”页完成,参考图三。

Step 5. Save Cuboid Statistics(保存Cuboid统计信息)

将采集的各个cuboid的统计信息保存至Kylin的元数据存储,供后续使用。

Step 6. Create HTable(创建HTable)

依据统计信息为当前构建的segment创建HBase table; 这一步以及上一步都是轻量级任务故很快可以完成。

Step 7. Build Base Cuboid(构建Base Base Cuboid)

这是“逐层”(by-layer) Cube构建的第一轮MR任务,它以临时表(即图四中的“full data”)为输入,计算包含所有维度的组合(称为base cuboid),也就是下图中的4-D cuboid。

图四:逐层构建四维Cube的过程

此任务的map数等于步骤2的reduce数量, 此任务的reduce数量是基于Cube统计出的此任务的output大小而估算的:默认为每500MB的输出分配一个reduce;如果您观察到此步骤的reduce数量较小,可以考虑在conf/kylin.properties中修改默认参数以获得更多reduce同时运行, 如:

kylin.job.mapreduce.default.reduce.input.mb=200

如果配置某些MR参数可以对任务有帮助的话,您可以将参数添加到conf/kylin_job_conf.xml中以供所有”逐层”构建任务所使用;如果需要为不同Cube设置不同MR参数值,可以在创建Cube的“参数覆盖”页,使用前缀kylin.job.mr.config.override.进行配置,如:

图五:在Cube上覆盖MR配置参数

Step 8. Build N-Dimension Cuboid(构建N-维Cuboid)

这是“逐层” Cube构建的其它轮MR任务,每一轮以上一轮的输出为输入,然后减去一个维度生成新的组合,并在reduce中聚合以生成子Cuboid。例如,以ABCD组合为输入,减去维度A生成BCD组合,减去维度B生成ACD组合。

有些维度组合可以从多个父亲组合来生成,在这种情况下,Kylin会选择ID值“最小”的那个父亲组合来生成。例如,组合AB可以从ABC(id: 1110)生成,也可以从ABD(id: 1101)生成,由于1101 < 1110, 所以ABD会被用来生成AB。基于此规则,如果维度D的基数很小,那么此时的聚合量将会很少,也就更加高效。图六介绍了一个四维度Cube的Cuboid生成树。

图六:四维Cube的生成树

因此,当您设计Cube的时候,在高级设置页面,请将低基数维度,尽量拖动放置在高基数维度的后面 。这个原则不仅会使得Cube构建更快,也会使得查询时的后聚合性能更好 。

通常从N-维到 (N/2)-维的计算比较慢,因为这是一个数据爆炸的过程:N-维有1个 Cuboid,(N-1)-维有 N个 cuboid, (N-2)-维 有N*(N-1)个 cuboid,以此类推。在 (N/2)-维之后,组合数是一个相反的过程,并且每个cuboid的行数较少,因此每步的时间逐渐缩短。

Step 9. Build Cube(使用in-mem算法构建Cube)

这一步是采用Kylin的“in-mem” 算法来构建Cube,它仅需一轮MR即可完成所有组合的计算,但它需要使用较多的计算资源。Kylin目录下的 conf/kylin_job_conf_inmem.xml配置文件就是为这一步任务而添加的;默认它会为每个Map任务向Hadoop请求3GB内存;如果您的集群内存资源较充足,可以修改配置以分配更多内存,这样Kylin在计算时可以更多地使用内存来缓存数据,减少磁盘的读写,从而提升构建性能;例如:

<property>
<name>mapreduce.map.memory.mb</name>
<value>6144</value>
<description></description>
</property>

<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx5632m</value>
<description></description>
</property>

如果需要为不同Cube设置不同MR参数值,也可以使用与图五相同的方法。

请注意:“in-mem” 算法和“by-layer”算法各有优劣;在有些情况下“in-mem”算法反而会慢。通常不建议用户自行设置算法,Kylin会根据Cube的统计信息选择更合适的算法;没有选中的算法的步骤会被跳过。

Step 10. Convert Cuboid Data to HFile(生成HFile)

此步骤启动一个MR任务,将前面计算好的cuboid文件(sequence文件格式)转换成HBase的HFile格式。Kylin会根据Cube统计信息,计算并预先分配好region;默认会为每5GB的数据分配一个region,然后再为每个region分配合适数量的HFile。MR的reduce数量等于HFile文件的数量。通常region数越多,这一步的并发度就越高;如果您观察到这一步的耗时较长,且reduce数量较少的话,可以在conf/kylin.properties中将下列参数的值设置的更小以获得更好性能,例如:

kylin.hbase.region.cut=2
kylin.hbase.hfile.size.gb=1

如果不确定您的集群中HBase合适的region大小,可以咨询HBase管理员。

Step 11. Load HFile to HBase Table(加载HFile到HBase)

在这步中Kylin使用 HBase API将生成好的 HFile通过BulkLoad的方式加载到HBase的 region server中;通常这一步很快即可完成。

Step 12. Update Cube Info(更新Cube信息)

在Cube数据加载到HBase后,Kylin将此次构建的Cube segment的状态,从”NEW”更新成”READY”,标记此segment可以供查询引擎使用。

Step 13. Cleanup(清理)

这是收尾的步骤,会删除第一步生成的临时表;到这里它已经不会影响其它任何任务,因为segment已经变成了READY状态。即便此步骤出错也不用担心,其后当运行[StorageCleanupJob] (https://kylin.apache.org/docs16/howto/howto_cleanup_storage.html)的时候,垃圾文件会被清。

小结

关于Kylin的优化有很多手段和技巧,需要理论结合实际多多练习。如果您有经验想要分享,欢迎订阅并参与Kylin开发群组的讨论: [dev@kylin.apache.org](mailto:dev@kylin.apache.org).