博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Impala支持Google云存储开发笔记
阅读量:2133 次
发布时间:2019-04-30

本文共 11785 字,大约阅读时间需要 39 分钟。

最近给Impala增加了对Google云存储(GCS)的支持(),记录一下开发笔记。希望能给有类似需求的朋友作参考,也欢迎大家给Impala增加阿里云存储(OSS)、腾讯云存储(COS)或其它存储的支持。

Impala是一个SQL引擎,本身并不管理数据,也没有持久化的状态,因此天然地适合存储计算分离的架构,可以很方便地部署到云上。比如Cloudera的CDP Public Cloud产品,就支持在AWS、Azure、GCP中使用Impala。目前Impala支持的存储除了HDFS、Kudu、HBase和本地文件系统(测试用)之外,还支持AWS的S3、Azure的ABFS和ADLS、Google云的GCS等。后面列的这些都是当成Hive表来读写的,因此它们首先要让Hive能支持,其次才是Impala。比如建表时,Impala是发RPC给HMS去建表,需要HMS去对应存储上创建目录等。

在 Impala 中,Java写的FE部分会用HDFS API获取文件的元数据(路径、大小、权限等),C++写的BE虽然调的是libhdfs,但其本质也是通过JNI使用HDFS的Java API。因此只要云存储实现了HDFS API的接口,对接Impala就很简单。下面是整个开发过程的记录。

第一步:了解GCS

GCS是Google云提供的对象存储服务:

GCS提供大部分文件操作的强一制性,只在权限相关的操作上会是最终一致:

GCS实现了HDFS API,不过代码没有放到hadoop repo里,而是单独在一个的repo,使用这个gcs-connector,就能像操作HDFS一样来操作GCS。

接下来是credentials,云上都需要配access token之类的来保证访问的安全性。创建完存储的bucket后,还需要创建个service account并指定必要的角色,用它来访问GCS。对于hdfs应用来说,有两种方式,最简单的就是设置GOOGLE_APPLICATION_CREDENTIALS环境变量,指向service account提供的key file(一个包含credential的json文件)。另外也可以在core-site.xml里加credential信息,参考CDP文档

配置GOOGLE_APPLICATION_CREDENTIALS环境变量的另一个好处是,同时也配好了gsutil的credential。使用gsutil可以直接操作GCS,对debug很有帮助,而且gsutil在上传大量文件以及file-listing上的性能要远高于直接使用hdfs命令。相关操作文档:

第二步:支持建表、读写GCS

配好credential后,在impala开发环境里重启Hive,就可以用Hive建表、读写数据了。接下来开始写impala这边的代码。

自从有了的重构之后,对新增存储系统的改动就集中了很多,FE里主要在FileSystemUtil里,照着其它存储类型的代码加上GCS的就行了。另外HdfsTable#assumeReadWriteAccess()里也需要加上GCS的判断,因为从GCS得到的权限信息实际上是假的(由fs.gs.reported.permissions配置),没必要去读取,在生成计划时假定所有路径都是可读写的就行了。

BE方面的改动可以参照s3的实现,简单 git grep -i s3 be 找下所有s3相关的代码,相应地都有对adls、abfs等的支持代码,加上gcs的就行了。当然有些地方要区别对待,比如file handle cache需要文件系统的InputStream实现CanUnbuffer接口,这个GCS还没有,不能照搬S3。

做完这些改动后就可以用impala操作GCS存储上的表了,接下来是跑测试。

第三步:跑通所有测试

Impala repo里有大量的e2e(即end-to-end)测试,但社区代码提交前只会跑hdfs模式的测试,测不到GCS新加的功能,因此需要自行跑通一下。

3.1 加载测试数据

操作概要:

  • 在minicluster上生成测试数据
  • 上传测试数据到GCS
  • 修改元数据指向GCS目录

目前测试数据只能在hdfs上生成,生成之后可以把整个 test-warehouse 目录copy到GCS上,然后再把HMS里的location URL都改成指向GCS bucket的就行了。

我申请了台GCE上的机器,方便快速上传数据到GCS(测试数据总共有8.7GB)。操作是在impala目录里先用 bin/bootstrap_system.sh 装下依赖,然后直接用 ./buildall.sh -noclean -notests -testdata -format 来编译并生成测试数据。网络需要能访问S3的URL(下载依赖用),机器配置好的话应该一个小时内能跑完。这有几个坑值得提一下:

  • /var/lib/hadoop-hdfs目录的owner需要是当前user,否则minicluster的hdfs无法启动。
  • 检查ulimit -a,如果max user processes比较小(比如只有4096),要把它调大,否则Hive生成数据时Tez作业会挂。可以简单用这个指令调高:
    ulimit -u $(cat /proc/self/limits | grep 'Max processes' | awk '{ print $4 }')
  • 数据加载完之后terminal的显示经常会有问题,输入的指令要敲回车执行时才显示。解决办法是键盘上敲 stty sane 然后回车就能重设正常。

数据在本地hdfs小集群加载完后,我们把它上传到GCS上。一开始我使用hdfs的cp指令,发现速度巨慢无比,可能是因为没有并发,而且GCS可能也没在这个API上做优化。于是换一种方式,先把整个 hdfs:///test-warehouse 目录 get 到本地,然后再用 gsutil 上传,速度就快了很多。指令可参考这个: gsutil -m cp -r test-warehouse gs://quanlong-impala-test-01/

这里-m就是并行执行的意思,具体怎么切分任务gsutil自己会做。quanlong-impala-test-01是我在GCS上创建的bucket名称。

接下来是修改HMS中的元数据指向GCS的目录,这些表的location还是指向minicluster上的路径,都需要改。先把Hive和Impala关掉,然后把postgresql 里HMS的数据dump出来,sed改完后再重导回去。命令参考的是 testdata/bin/load-metastore-snapshot.sh 中的代码。操作如下:

# 关闭Hive和Impalatestdata/bin/kill-hive-server.shbin/start-impala-cluster.py --kill# dump已有原数据并修改locationpg_dump -U hiveuser HMS_home_systest_impala_cdp > HMS_home_systest_impala_cdp.sqlsed -i "s|hdfs://localhost:20500|gs://quanlong-impala-test-01|g" HMS_home_systest_impala_cdp.sql# 重建HMS元数据bin/create-test-configuration.sh -create_metastore # 导入修改后的元数据,并删除hdfs caching相关元数据psql -q -U hiveuser HMS_home_systest_impala_cdp_gcs < HMS_home_systest_impala_cdp.sqlpsql -q -U hiveuser -d ${METASTORE_DB} -c "delete from \"TABLE_PARAMS\" where \"PARAM_KEY\"='cache_directive_id'"psql -q -U hiveuser -d ${METASTORE_DB} -c "delete from \"PARTITION_PARAMS\" where \"PARAM_KEY\"='cache_directive_id'"

最后两行删除hdfs caching相关元数据很重要,因为hdfs caching功能只有在hdfs上能work,在GCS上会导致table metadata loading失败。

做完这些再启动Hive和Impala: testdata/bin/run-hive-server.sh && bin/start-impala-cluster.py

3.2 跑所有e2e和custom cluster测试

Impala的测试很多,包括BE和FE的单元测试,以及e2e tests、custom cluster tests等。其中e2e tests就是端到端测试,即给定查询和数据,然后验证查询的输出;custom cluster tests是非默认配置下的集群测试,每个测试都是按给定参数启动集群,然后跑测试。

和GCS有关的主要是e2e tests和custom cluster tests,测试指令如下:

export BE_TEST=falseexport FE_TEST=falseexport JDBC_TEST=falsebin/run-all-tests.sh -e core

e2e测试的日志默认在 $IMPALA_HOME/logs/ee_tests 里,custom cluster测试的日志默认在 $IMPALA_HOME/logs/custom_cluster_tests 里。里面会生成一些给Jenkins用的测试报告,可以知道哪些测试失败了。当然也可以在ternimal页面看到哪些失败了,用tmux的话记得把history-limit调大,免得历史输出被刷没了。比如在.tmux.conf中设置set-option -g history-limit 50000

如果想单独跑某个测试,可以用 bin/impala-py.test,比如:bin/impala-py.test custom_cluster/test_restart_services.py::TestGracefulShutdown::test_shutdown_idle

注:这种手动单独跑的custom cluster测试,日志是在/tmp下。

第四步:完善自动化测试脚本

GCE上的机器比较贵(当然程序员的时间也比较贵),因此把上述流程自动化很重要。我当时是下班时把GCE上的instance销毁,第二天上班时申请个新instance,再用脚本一次性把上面加载测试数据的事情做完,然后开始debug。

Impala代码里有不少自动化脚本,需要相应地针对GCS增加下功能,具体可以见 里面对 testdata 目录下的修改。

然后就是给bin/impala-config-local.sh加些用到的环境变量:

export TARGET_FILESYSTEM=gsexport GOOGLE_CLOUD_PROJECT_ID="gcp-dev-impala"export GOOGLE_CLOUD_SERVICE_ACCOUNT="impala-gcs-test"export GOOGLE_APPLICATION_CREDENTIALS="/home/quanlong/Impala/gcp-dev-impala-757905e09383.json"export GCS_BUCKET="quanlong-impala-test-01"

它们在你 source bin/impala-config.sh 时会被执行。

另外需要把之前生成的postgresql的dump文件和test-warehouse的数据压缩保留好,然后一行指令编译并加载它们:./buildall.sh -format -noclean -snapshot_file test-warehouse.tar.gz -metastore_snapshot_file HMS_home_systest_impala_cdp.sql -skiptests

第五步:修bug

测试过程发现了不少坑,这里记录一下印象最深的一个,而且是其它云存储也可能遇到的。

GCS提供了HDFS的API,直观上按HDFS来使用就可以了,但由于HDFS API定义不是很明确,在使用方式上引入了一个bug。具体地说就是在RemoteIterator的使用上出现的bug。

FileSystem有一个listStatusIterator接口,返回一个RemoteIterator,用来迭代式地对目录做listing:

public interface RemoteIterator
{
boolean hasNext() throws IOException; E next() throws IOException;}

这对于大目录特别有用,因为一次性得到目录下所有文件/目录的list结果会很慢,如果能迭代式地获取,客户端就能流式地处理返回的结果,效率大幅提高。官方文档写的使用方式是这样:

while (iterator.hasNext()) {
process(iterator.next());}

文档有提到当接口抛出NoSuchElementException时循环应该终止,但却没有提到FileNotFoundException的处理方式。在Impala代码里有一段奇特的用法,简化起来是这样:

while (true) {
try {
if (!iter.hasNext()) break; process(iter.next()); } catch (FileNotFoundException e) {
continue; }}

这段代码在GCS上会死循环!具体就是在hasNext()一直抛FileNotFoundException,永远不返回false的时候。

这个catch是在 里加的,背景是这样:Hive在写入数据时会先写到目标表下的临时目录,写成功后再把文件一并move到目标目录,然后把临时目录删掉。Impala在加载表的元数据时,可能会因为这些临时目录的删除而拿到FileNotFoundException,这时我们期望的动作是跳过这些异常,继续把文件元数据加载完(而不是因此就把整个操作失败掉,需要等invalidate metadata tableName来恢复)。

初衷是好的,但RemoteIterator的接口文档可没有说抛完FileNotFoundException后,这个iterator还是否有效,还能否继续被使用。

Impala的FileSystemUtil里就有个RecursingIterator实现了RemoteIterator接口,用来做递归listing。一般的RemoteIterator是不递归的,即只能得到当前目录下的文件/子目录信息。RecursingIterator包装了这些RemoteIterator,当遇到目录时,就调FileSystem的listStatusIterator()接口来递归地做listing。注:这段代码实际参考的是HDFS里 的实现。

这里把代码简要贴一下:

private static class RecursingIterator implements RemoteIterator
{
private final FileSystem fs_; private final Stack
> iters_ = new Stack<>(); private RemoteIterator
curIter_; private FileStatus curFile_; private RecursingIterator(FileSystem fs, Path startPath) throws IOException {
this.fs_ = Preconditions.checkNotNull(fs); curIter_ = fs.listStatusIterator(Preconditions.checkNotNull(startPath)); } @Override public boolean hasNext() throws IOException {
// 循环直到获得新的 FileStatus while (curFile_ == null) {
if (curIter_.hasNext()) {
// 处理子目录或文件 handleFileStat(curIter_.next()); } else if (!iters_.empty()) {
// 当前 iterator 处理完了,回到上层目录继续 listing curIter_ = iters_.pop(); } else {
// 所有 iterator 都处理完了,listing 结束 return false; } } return true; } private void handleFileStat(FileStatus fileStatus) throws IOException {
if (fileStatus.isFile()) {
curFile_ = fileStatus; return; } // 如果是子目录,获取它的listing iterator,先把当前目录的iterator压栈 RemoteIterator
iter = fs_.listStatusIterator(fileStatus.getPath()); iters_.push(curIter_); curIter_ = iter; curFile_ = fileStatus; } @Override public FileStatus next() throws IOException {
if (hasNext()) {
FileStatus result = curFile_; // 重置回 null 以让 hasNext() 获取新的 FileStatus curFile_ = null; return result; } throw new NoSuchElementException("No more entries"); } }

出现死循环的代码在 FilterIterator 中,它也是一个RemoteIterator的实现,这个iterator主要封装过滤无关文件的逻辑,代码里最重要的是对 isInIgnoredDirectory() 的调用,以及那个catch FileNotFoundException:

static class FilterIterator implements RemoteIterator
{
private final RemoteIterator
baseIterator_; private FileStatus curFile_ = null; private final Path startPath_; FilterIterator(Path startPath, RemoteIterator
baseIterator) {
startPath_ = Preconditions.checkNotNull(startPath); baseIterator_ = Preconditions.checkNotNull(baseIterator); } @Override public boolean hasNext() throws IOException {
while (curFile_ == null) {
FileStatus next; try {
if (!baseIterator_.hasNext()) return false; next = baseIterator_.next(); } catch (FileNotFoundException ex) {
LOG.warn(ex.getMessage()); continue; } if (!isInIgnoredDirectory(startPath_, next)) {
curFile_ = next; return true; } } return true; } @Override public FileStatus next() throws IOException {
if (hasNext()) {
FileStatus next = curFile_; curFile_ = null; return next; } throw new NoSuchElementException("No more entries"); } }

FilterIterator 要求使用的 iterator 是在抛 FileNotFoundException 后还是合法的,即还可以继续调 hasNext() 和 next(),直到 hasNext() 返回 false。这可不是 HDFS API 文档里有明确定义的!

在大部分情况下,FilterIterator 使用的就是前面那个 RecursingIterator。当 RecursingIterator 使用的是 HDFS(即 DistributedFileSystem)生成的 iterator 时,是能满足上述性质的。但使用 GCS(即GoogleHadoopFileSystem)生成的 iterator 就不行了,区别在于它们抛 FileNotFoundException 的时机不同。用下面这段代码中的注释来说明:

iterator = fs.listStatusIterator(path); // 如果path不存在,HDFS在此处抛FileNotFoundExceptionwhile (iterator.hasNext()) { // 如果path不存在,GCS在此处抛FileNotFoundException  process(iterator.next());}

区别就在于FileSystem#listStatusIterator这个API的实现,DistributedFileSystem会预拉取一些文件信息,因此会检查path是否存在,不存在就抛exception。而GCS没有实现这个API,使用的是FileSystem原生的实现,也就是只创建iterator对象(不发任何RPC),只有在hasNext()时发起RPC才知道path不存在,此时才抛exception。

这个区别为何致命呢?关键在 RecursingIterator#handleFileStat() 里,如果要递归下去的子目录实际已经被删了,HDFS会在对其调listStatusIterator()时抛exception,从而curIter_不会被改,而这个子目录的FileStatus也已经被读出了。下次 RecursingIterator#hasNext() 被调用时,就跳过这个不存在的子目录了。但在GCS里,listStatusIterator()没抛exception,curIter_就被换成子目录的iterator了,然后在hasNext()时才抛exception,无法改变任何状态,然后调用方FilterIterator不停调用hasNext() 进入死循环。

一个简单的修法就是封装一下对 fs.listStatusIterator() 的调用,每次得到iterator时都先调一次hasNext(),保证目录是存在的。这也是在GCS这个patch里采用的修法:

当然这只是让RecursingIterator又满足了前述性质,实际上 FilterIterator 并不能依赖这样的性质。因此在 ,我们把这个问题更彻底地修了一下。具体见

总结一下这个bug:

  • Impala在加载文件元信息时依赖了未定义的行为,即RemoteIterator在抛FileNotFoundException后还能继续被使用,直到hasNext()返回false为止。
  • 这个bug在DistributedFileSystem上没事,是因为目录不存在的话,listStatusIterator()会抛exception,根本不会给你这样的RemoteIterator。
  • 这个bug在GCS以及其它没有重载FileSystem#listStatusIterator()的存储上导致了死循环,因为能拿到RemoteIterator,但调用其hasNext()只会一直抛exception。不光是GCS,在ADLS、ABFS和老版本的S3实现都能重现。
  • 触发情况是存在过时的目录信息,又需要对其做file listing时。比如分区目录被手动删掉后,重新加载该表元数据时。或者加载元数据时列出某个目录的子项目后,里面有个临时子目录被删掉了,当要进入该子目录时也会触发。

另外, 在FileSystem里加了判断,如果目录不存在的话,listStatusIterator()会抛exception,也能避免这个bug,但需要hadoop 3.3.0, 3.2.2 及后续版本才有这个修改。在我开发GCS支持的时候,gcs-connector依赖的还只是3.2.1,所以才能发现这个bug。后来提了,现在gcs-connector已经把hadoop3依赖版本升级到了3.2.2,见

其它未来工作

目前Impala只是基本支持了GCS,即基本功能有了,但还有一些优化没有做。比如

  • GoogleHadoopFileSystem的list操作没有优化,性能比其它存储要慢不少,导致一些压力测试超时()。实际上 gsutil 的 flat listing 操作就快很多, 比如这样 gsutil ls gs://mybucket/**,它的代价只是返回的文件无序。我们在listing时并不需要什么顺序,所以这里可以优化。
  • GCS的InputStream没有实现CanUnbuffer接口,因此Impala没法安全地缓存GCS file handle( )
  • Spill-to-GCS需要测试找到合适的配置参数再启用()

最后,欢迎熟悉其它存储系统的同学为Impala开发对应的支持功能。Impala使用Gerrit来做Code Review,新开发者可以参考这两个文档:

有问题也可以在社区中讨论,参见《》。

转载地址:http://ojzgf.baihongyu.com/

你可能感兴趣的文章
【雅思】雅思写作作业(1)
查看>>
【雅思】【大作文】【审题作业】关于同不同意的审题作业(重点)
查看>>
【Loadrunner】通过loadrunner录制时候有事件但是白页无法出来登录页怎么办?
查看>>
【English】【托业】【四六级】写译高频词汇
查看>>
【托业】【新东方全真模拟】01~02-----P5~6
查看>>
【托业】【新东方全真模拟】03~04-----P5~6
查看>>
【托业】【新东方托业全真模拟】TEST05~06-----P5~6
查看>>
【托业】【新东方托业全真模拟】TEST09~10-----P5~6
查看>>
【托业】【新东方托业全真模拟】TEST07~08-----P5~6
查看>>
solver及其配置
查看>>
JAVA多线程之volatile 与 synchronized 的比较
查看>>
Java集合框架知识梳理
查看>>
笔试题(一)—— java基础
查看>>
Redis学习笔记(三)—— 使用redis客户端连接windows和linux下的redis并解决无法连接redis的问题
查看>>
Intellij IDEA使用(一)—— 安装Intellij IDEA(ideaIU-2017.2.3)并完成Intellij IDEA的简单配置
查看>>
Intellij IDEA使用(二)—— 在Intellij IDEA中配置JDK(SDK)
查看>>
Intellij IDEA使用(三)——在Intellij IDEA中配置Tomcat服务器
查看>>
Intellij IDEA使用(四)—— 使用Intellij IDEA创建静态的web(HTML)项目
查看>>
Intellij IDEA使用(五)—— Intellij IDEA在使用中的一些其他常用功能或常用配置收集
查看>>
Intellij IDEA使用(六)—— 使用Intellij IDEA创建Java项目并配置jar包
查看>>