如何在 Kylin 中优雅地使用 Spark

王汝鹏
Kyligence 大数据研发工程师
2019年 8月 31日

前言

Kylin 用户在使用 Spark的过程中,经常会遇到任务提交缓慢、构建节点不稳定的问题。为了更方便地向 Spark 提交、管理和监控任务,有些用户会使用 Livy 作为 Spark 的交互接口。在最新的 Apache Kylin 3.0 版本中,Kylin 加入了通过 Apache Livy 递交 Spark 任务的新功能[KYLIN-3795],特此感谢滴滴靳国卫同学对此功能的贡献。

Livy 介绍

Apache Livy 是一个基于 Spark 的开源 REST 服务,是 Apache 基金会的一个孵化项目,它能够通过 REST 的方式将代码片段或是序列化的二进制代码提交到 Spark 集群中去执行。它提供了如下基本功能:

  • 提交 Scala、Python 或是 R 代码片段到远端的 Spark 集群上执行。
  • 提交 Java、Scala、Python 所编写的 Spark 作业到远端的 Spark 集群上执行。

Apache Livy 架构

为什么使用 Livy

1. 当前 Spark 存在的问题

Spark 当前支持两种交互方式:

  • 交互式处理用户使用 spark-shell 或 pyspark 脚本启动 Spark 应用程序,伴随应用程序启动的同时,Spark 会在当前终端启动 REPL(Read–Eval–Print Loop) 来接收用户的代码输入,并将其编译成 Spark 作业。
  • 批处理批处理的程序逻辑由用户实现并编译打包成 jar 包,spark-submit 脚本启动 Spark 应用程序来执行用户所编写的逻辑,与交互式处理不同的是批处理程序在执行过程中用户没有与 Spark 进行任何的交互。

两种方式都需要用户登录到 Gateway 节点上通过脚本启动 Spark 进程,但是会出现以下问题:

  • 增加 Gateway 节点的资源使用负担和故障发生的可能性。
  • 同时 Gateway 节点的故障会带来单点问题,造成 Spark 程序的失败。
  • 难以管理、审计以及与已有的权限管理工具的集成。由于 Spark 采用脚本的方式启动应用程序,因此相比于 WEB 方式少了许多管理、审计的便利性,同时也难以与已有的工具结合,如 Apache Knox 等。
  • 将 Gateway 节点上的部署细节以及配置不可避免地暴露给了登陆用户。

2. Livy 优势

一方面,接受并解析用户的 REST 请求,转换成相应的操作;另一方面,它管理着用户所启动的所有的 Spark 集群。

Livy 具有如下功能:

  • 通过 Livy session 实时提交代码片段与 Spark 的 REPL 进行交互。
  • 通过 Livy batch 提交 Scala、Java、Python 编写的二进制包来提交批处理任务。
  • 多用户能够使用同一个服务器(支持用户模拟)。
  • 能够通过 REST 接口在任何设备上提交任务、查看任务执行状态和结果。

Kylin with Livy

1. 引入 Livy 之前 Kylin 是如何使用 Spark 的

Spark 是在 Kylin v2.0 引入的,主要应用于 Cube 构建,构建过程介绍可以查看:https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/

下面是 SparkExecutable 类的 doWork 方法关于提交 Spark job 的一段代码,我们可以看到 Kylin 会从配置中获取 Spark job 包的路径(默认为 $KYLIN_HOME/lib),通过本地指令的形式提交 Spark job,然后循环获取 Spark job 的执行状态和结果。我们可以看到 Kylin 单独开了一个线程在本地向 Spark 客户端发送来 job 请求并且循环获取结果,额外增加了节点系统压力。

@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
    //略...
    String jobJar = config.getKylinJobJarPath(); //获取job jar的路径
    //略...
    final String cmd = String.format(Locale.ROOT, stringBuilder.toString(), hadoopConf,KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); //构建本地command
    //略...
    //创建指令执行线程
    Callable callable = new Callable<Pair<Integer, String>>() {
        @Override
        public Pair<Integer, String> call() throws Exception {
            Pair<Integer, String> result;
            try {
                result = exec.execute(cmd, patternedLogger);
                } catch (Exception e) {
                logger.error("error run spark job:", e);
                result = new Pair<>(-1, e.getMessage());
            }
            return result;
        }
    };
    //略...
    try {
        Future<Pair<Integer, String>> future = executorService.submit(callable);
        Pair<Integer, String> result = null;
        while (!isDiscarded() && !isPaused()) {
            if (future.isDone()) {
                result = future.get(); //循环获取指令执行结果
                break;
            } else {
                Thread.sleep(5000); //每隔5秒检查一次job执行状态
            }
        }
    //略...
    } catch (Exception e) {
        logger.error("Error run spark job:", e);
        return ExecuteResult.createError(e);
    }
    //略...
}

2. Livy for Kylin 详细解析

Livy 向 Spark 提交 job 一共有两种,分别是 Session 和 Batch,Kylin 是通过 Batch 的方式提交 job 的,需要提前构建好 Spark job 对应的 jar 包并上传到 HDFS 中,并且将配置项 kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar 加入到 kyiln.properties 中。

Batch 一共具有如下九种状态:

public enum LivyStateEnum {
    starting, running, success, dead, error, not_started, idle, busy, shutting_down;
}

下面是 SparkExecutableLivy 类的 doWork 方法和 LivyRestExecutor 类的 execute 方法关于提交 Spark job 的一段代码,Kylin 通过 livyRestBuilder 读取配置文件获取 Spark job 的包路径,然后通过 restClient 向 Livy 发送 Http 请求。在提交 job 之后会每隔 10 秒查询一次 job 执行的结果,直到 job 的状态变为 shutting_down, error, dead, success 中的一种。每一次都是通过 Http 的方式发送请求,相比较于通过本地 Spark 客户端提交任务,更加稳定而且减少了 Kylin 节点系统压力。

@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
    //略...
    livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job);
    executor.execute(livyRestBuilder, patternedLogger); //调用LivyRestExecutor类的execute方法
    if (isDiscarded()) {
        return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
    }
    if (isPaused()) {
        return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
    }
    //略...
}

public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) {
    LivyRestClient restClient = new LivyRestClient();
    String result = restClient.livySubmitJobBatches(dataJson); //向Livy发送http请求
    JSONObject resultJson = new JSONObject(result);
    String state = resultJson.getString("state"); //得到Livy请求结果
    final String livyTaskId = resultJson.getString("id");
    while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state)
            && !LivyStateEnum.error.toString().equalsIgnoreCase(state)
            && !LivyStateEnum.dead.toString().equalsIgnoreCase(state)
            && !LivyStateEnum.success.toString().equalsIgnoreCase(state)) {
        String statusResult = restClient.livyGetJobStatusBatches(livyTaskId); //获取Spark job执行状态
        JSONObject stateJson = new JSONObject(statusResult);
        if (!state.equalsIgnoreCase(stateJson.getString("state"))) {
            logAppender.log("Livy status Result: " + stateJson.getString("state"));
        }
        state = stateJson.getString("state");
        Thread.sleep(10*1000); //每10秒检查一次结果
    }
}

3. Livy 在 Kylin 中的应用

构建 Intermediate Flat Hive Table 和 Redistribute Flat Hive Table 原本都是通过 Hive 客户端(Cli 或 Beeline)进行构建的,引入 Livy 之后,Kylin 通过 Livy 来调用 SparkSQL 进行构建,提高了平表的构建速度。在引入 Livy 之后,Cube 的构建主要改变的是以下几个步骤,对应的任务日志输出如下:

  • 构建 Intermediate Flat Hive Table
  • 构建 Redistribute Flat Hive Table
  • 使用 Spark-Submit 的地方都用 Livy 的 Batch API 进行替换

1)构建 Cube

2)转换 Cuboid 为 HFile

4. 引入 Livy 对 Kylin 的好处

  • 无需准备 Spark 的客户端配置,Kylin 部署更加轻量化。
  • Kylin 节点系统压力更低,无需在 Kylin 节点启动 Spark 客户端。
  • 构建 Flat Hive Table 更快,通过 Livy 可以使用 Spark SQL 构建平表,而 Spark SQL 要快于 Hive。
  • 提交 job 更快,job 状态获取更方便。

5. 如何在 Kylin 中启用 Livy

在 Kylin 启用 Livy 前,请先确保 Livy 能够正常工作

1)在 Kylin.properties 中,加入如下配置,并重启使之生效。

//此处为CDH5.7环境下的配置
kylin.engine.livy-conf.livy-enabled=true
kylin.engine.livy-conf.livy-url=http://cdh-client:8998
kylin.engine.livy-conf.livy-key.file=hdfs:///path/kylin-job-3.0.0-SNAPSHOT.jar
//请根据个人环境替换对应版本的包
kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-server-1.2.0-cdh5.7.5.jar,hdfs:///path/htrace-core-3.2.0-incubating.jar,hdfs:///path/metrics-core-2.2.0.jar

其中 livy-key.file 和 livy-arr.jars 地址之间不要有空格,否则可能会出不可预知的错误。

2)Cube 构建引擎选用 Spark。

常见问题

以下问题往往为使用不当和配置错误的原因,非 Kylin 本身存在的问题,此处仅为友情提示。

1. Table or view not found

输出日志:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: `DEFAULT`.`KYLIN_SALES`; line 21 pos 6;

解决方法:

//将hive-site.xml拷贝到spark的配置文件目录中
ln -s /etc/hive/conf/hive-site.xml $SPARK_CONF_DIR

2. livy request 400 error
解决方法:

//kylin.properties Livy配置项jar包地址之间不要留空格
//此处为CDH5.7环境下的依赖包,请根据个人环境替换对应版本的包
kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/

3. NoClassDefFoundError
输出日志:

NoClassDefFoundError: org/apache/hadoop/hbase/protobuf/generated/HFileProtos

解决方法:

find /opt -type f -name "hbase-protocol*.jar"
cp /path/to/hbase-protocol-1.2.0-cdh5.7.5.jar $SPARK_HOME/jars

4. livy sql 执行错误
解决方法:

//kylin.properties中添加如下配置
kylin.source.hive.quote-enabled=false

总结

Livy 本质上是在 Spark 上的 REST 服务,对于 Kylin cube 的构建没有本质上的性能提升,但是通过引入 Livy,Kylin 能够直接通过 Spark SQL 代替 Hive 构建 Flat Table,而且管理 Spark job 也更加方便。但是,Livy 当前也存在一些问题,比如使用较低或较高版本的 Spark 无法正常工作以及单点故障等问题,用户可以考虑自身的实际场景选择是否需要在 Kylin 中使用 Livy。

参考文章

  1. https://hortonworks.com/blog/livy-a-rest-interface-for-apache-spark/
  2. https://wiki.apache.org/incubator/LivyProposal
  3. https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/

作者简介:王汝鹏,Kyligence 大数据研发工程师,主要负责 Apache Kylin 社区维护和开发。GitHub:https://github.com/rupengwang。