本文共 11785 字,大约阅读时间需要 39 分钟。
最近给Impala增加了对Google云存储(GCS)的支持(),记录一下开发笔记。希望能给有类似需求的朋友作参考,也欢迎大家给Impala增加阿里云存储(OSS)、腾讯云存储(COS)或其它存储的支持。
Impala是一个SQL引擎,本身并不管理数据,也没有持久化的状态,因此天然地适合存储计算分离的架构,可以很方便地部署到云上。比如Cloudera的CDP Public Cloud产品,就支持在AWS、Azure、GCP中使用Impala。目前Impala支持的存储除了HDFS、Kudu、HBase和本地文件系统(测试用)之外,还支持AWS的S3、Azure的ABFS和ADLS、Google云的GCS等。后面列的这些都是当成Hive表来读写的,因此它们首先要让Hive能支持,其次才是Impala。比如建表时,Impala是发RPC给HMS去建表,需要HMS去对应存储上创建目录等。
在 Impala 中,Java写的FE部分会用HDFS API获取文件的元数据(路径、大小、权限等),C++写的BE虽然调的是libhdfs,但其本质也是通过JNI使用HDFS的Java API。因此只要云存储实现了HDFS API的接口,对接Impala就很简单。下面是整个开发过程的记录。
GCS是Google云提供的对象存储服务:
GCS提供大部分文件操作的强一制性,只在权限相关的操作上会是最终一致:GCS实现了HDFS API,不过代码没有放到hadoop repo里,而是单独在一个的repo,使用这个gcs-connector,就能像操作HDFS一样来操作GCS。
接下来是credentials,云上都需要配access token之类的来保证访问的安全性。创建完存储的bucket后,还需要创建个service account并指定必要的角色,用它来访问GCS。对于hdfs应用来说,有两种方式,最简单的就是设置GOOGLE_APPLICATION_CREDENTIALS环境变量,指向service account提供的key file(一个包含credential的json文件)。另外也可以在core-site.xml里加credential信息,参考CDP文档
配置GOOGLE_APPLICATION_CREDENTIALS环境变量的另一个好处是,同时也配好了gsutil的credential。使用gsutil可以直接操作GCS,对debug很有帮助,而且gsutil在上传大量文件以及file-listing上的性能要远高于直接使用hdfs命令。相关操作文档:
配好credential后,在impala开发环境里重启Hive,就可以用Hive建表、读写数据了。接下来开始写impala这边的代码。
自从有了的重构之后,对新增存储系统的改动就集中了很多,FE里主要在FileSystemUtil里,照着其它存储类型的代码加上GCS的就行了。另外HdfsTable#assumeReadWriteAccess()里也需要加上GCS的判断,因为从GCS得到的权限信息实际上是假的(由fs.gs.reported.permissions配置),没必要去读取,在生成计划时假定所有路径都是可读写的就行了。
BE方面的改动可以参照s3的实现,简单 git grep -i s3 be
找下所有s3相关的代码,相应地都有对adls、abfs等的支持代码,加上gcs的就行了。当然有些地方要区别对待,比如file handle cache需要文件系统的InputStream实现CanUnbuffer接口,这个GCS还没有,不能照搬S3。
做完这些改动后就可以用impala操作GCS存储上的表了,接下来是跑测试。
Impala repo里有大量的e2e(即end-to-end)测试,但社区代码提交前只会跑hdfs模式的测试,测不到GCS新加的功能,因此需要自行跑通一下。
操作概要:
目前测试数据只能在hdfs上生成,生成之后可以把整个 test-warehouse 目录copy到GCS上,然后再把HMS里的location URL都改成指向GCS bucket的就行了。
我申请了台GCE上的机器,方便快速上传数据到GCS(测试数据总共有8.7GB)。操作是在impala目录里先用 bin/bootstrap_system.sh 装下依赖,然后直接用 ./buildall.sh -noclean -notests -testdata -format
来编译并生成测试数据。网络需要能访问S3的URL(下载依赖用),机器配置好的话应该一个小时内能跑完。这有几个坑值得提一下:
/var/lib/hadoop-hdfs
目录的owner需要是当前user,否则minicluster的hdfs无法启动。ulimit -u $(cat /proc/self/limits | grep 'Max processes' | awk '{ print $4 }')
数据在本地hdfs小集群加载完后,我们把它上传到GCS上。一开始我使用hdfs的cp指令,发现速度巨慢无比,可能是因为没有并发,而且GCS可能也没在这个API上做优化。于是换一种方式,先把整个 hdfs:///test-warehouse
目录 get 到本地,然后再用 gsutil 上传,速度就快了很多。指令可参考这个: gsutil -m cp -r test-warehouse gs://quanlong-impala-test-01/
quanlong-impala-test-01
是我在GCS上创建的bucket名称。 接下来是修改HMS中的元数据指向GCS的目录,这些表的location还是指向minicluster上的路径,都需要改。先把Hive和Impala关掉,然后把postgresql 里HMS的数据dump出来,sed改完后再重导回去。命令参考的是 testdata/bin/load-metastore-snapshot.sh
中的代码。操作如下:
# 关闭Hive和Impalatestdata/bin/kill-hive-server.shbin/start-impala-cluster.py --kill# dump已有原数据并修改locationpg_dump -U hiveuser HMS_home_systest_impala_cdp > HMS_home_systest_impala_cdp.sqlsed -i "s|hdfs://localhost:20500|gs://quanlong-impala-test-01|g" HMS_home_systest_impala_cdp.sql# 重建HMS元数据bin/create-test-configuration.sh -create_metastore # 导入修改后的元数据,并删除hdfs caching相关元数据psql -q -U hiveuser HMS_home_systest_impala_cdp_gcs < HMS_home_systest_impala_cdp.sqlpsql -q -U hiveuser -d ${METASTORE_DB} -c "delete from \"TABLE_PARAMS\" where \"PARAM_KEY\"='cache_directive_id'"psql -q -U hiveuser -d ${METASTORE_DB} -c "delete from \"PARTITION_PARAMS\" where \"PARAM_KEY\"='cache_directive_id'"
最后两行删除hdfs caching相关元数据很重要,因为hdfs caching功能只有在hdfs上能work,在GCS上会导致table metadata loading失败。
做完这些再启动Hive和Impala: testdata/bin/run-hive-server.sh && bin/start-impala-cluster.py
Impala的测试很多,包括BE和FE的单元测试,以及e2e tests、custom cluster tests等。其中e2e tests就是端到端测试,即给定查询和数据,然后验证查询的输出;custom cluster tests是非默认配置下的集群测试,每个测试都是按给定参数启动集群,然后跑测试。
和GCS有关的主要是e2e tests和custom cluster tests,测试指令如下:
export BE_TEST=falseexport FE_TEST=falseexport JDBC_TEST=falsebin/run-all-tests.sh -e core
e2e测试的日志默认在 $IMPALA_HOME/logs/ee_tests
里,custom cluster测试的日志默认在 $IMPALA_HOME/logs/custom_cluster_tests
里。里面会生成一些给Jenkins用的测试报告,可以知道哪些测试失败了。当然也可以在ternimal页面看到哪些失败了,用tmux的话记得把history-limit调大,免得历史输出被刷没了。比如在.tmux.conf
中设置set-option -g history-limit 50000
如果想单独跑某个测试,可以用 bin/impala-py.test,比如:bin/impala-py.test custom_cluster/test_restart_services.py::TestGracefulShutdown::test_shutdown_idle
GCE上的机器比较贵(当然程序员的时间也比较贵),因此把上述流程自动化很重要。我当时是下班时把GCE上的instance销毁,第二天上班时申请个新instance,再用脚本一次性把上面加载测试数据的事情做完,然后开始debug。
Impala代码里有不少自动化脚本,需要相应地针对GCS增加下功能,具体可以见 里面对 testdata 目录下的修改。
然后就是给bin/impala-config-local.sh
加些用到的环境变量:
export TARGET_FILESYSTEM=gsexport GOOGLE_CLOUD_PROJECT_ID="gcp-dev-impala"export GOOGLE_CLOUD_SERVICE_ACCOUNT="impala-gcs-test"export GOOGLE_APPLICATION_CREDENTIALS="/home/quanlong/Impala/gcp-dev-impala-757905e09383.json"export GCS_BUCKET="quanlong-impala-test-01"
它们在你 source bin/impala-config.sh 时会被执行。
另外需要把之前生成的postgresql的dump文件和test-warehouse的数据压缩保留好,然后一行指令编译并加载它们:./buildall.sh -format -noclean -snapshot_file test-warehouse.tar.gz -metastore_snapshot_file HMS_home_systest_impala_cdp.sql -skiptests
测试过程发现了不少坑,这里记录一下印象最深的一个,而且是其它云存储也可能遇到的。
GCS提供了HDFS的API,直观上按HDFS来使用就可以了,但由于HDFS API定义不是很明确,在使用方式上引入了一个bug。具体地说就是在RemoteIterator的使用上出现的bug。
FileSystem有一个listStatusIterator接口,返回一个RemoteIterator,用来迭代式地对目录做listing:
public interface RemoteIterator{ boolean hasNext() throws IOException; E next() throws IOException;}
这对于大目录特别有用,因为一次性得到目录下所有文件/目录的list结果会很慢,如果能迭代式地获取,客户端就能流式地处理返回的结果,效率大幅提高。官方文档写的使用方式是这样:
while (iterator.hasNext()) { process(iterator.next());}
文档有提到当接口抛出NoSuchElementException
时循环应该终止,但却没有提到FileNotFoundException
的处理方式。在Impala代码里有一段奇特的用法,简化起来是这样:
while (true) { try { if (!iter.hasNext()) break; process(iter.next()); } catch (FileNotFoundException e) { continue; }}
这段代码在GCS上会死循环!具体就是在hasNext()
一直抛FileNotFoundException
,永远不返回false的时候。
这个catch是在 里加的,背景是这样:Hive在写入数据时会先写到目标表下的临时目录,写成功后再把文件一并move到目标目录,然后把临时目录删掉。Impala在加载表的元数据时,可能会因为这些临时目录的删除而拿到FileNotFoundException
,这时我们期望的动作是跳过这些异常,继续把文件元数据加载完(而不是因此就把整个操作失败掉,需要等invalidate metadata tableName
来恢复)。
初衷是好的,但RemoteIterator
的接口文档可没有说抛完FileNotFoundException
后,这个iterator还是否有效,还能否继续被使用。
Impala的FileSystemUtil
里就有个RecursingIterator
实现了RemoteIterator接口,用来做递归listing。一般的RemoteIterator
是不递归的,即只能得到当前目录下的文件/子目录信息。RecursingIterator
包装了这些RemoteIterator
,当遇到目录时,就调FileSystem的listStatusIterator()
接口来递归地做listing。注:这段代码实际参考的是HDFS里 的实现。
这里把代码简要贴一下:
private static class RecursingIterator implements RemoteIterator{ private final FileSystem fs_; private final Stack > iters_ = new Stack<>(); private RemoteIterator curIter_; private FileStatus curFile_; private RecursingIterator(FileSystem fs, Path startPath) throws IOException { this.fs_ = Preconditions.checkNotNull(fs); curIter_ = fs.listStatusIterator(Preconditions.checkNotNull(startPath)); } @Override public boolean hasNext() throws IOException { // 循环直到获得新的 FileStatus while (curFile_ == null) { if (curIter_.hasNext()) { // 处理子目录或文件 handleFileStat(curIter_.next()); } else if (!iters_.empty()) { // 当前 iterator 处理完了,回到上层目录继续 listing curIter_ = iters_.pop(); } else { // 所有 iterator 都处理完了,listing 结束 return false; } } return true; } private void handleFileStat(FileStatus fileStatus) throws IOException { if (fileStatus.isFile()) { curFile_ = fileStatus; return; } // 如果是子目录,获取它的listing iterator,先把当前目录的iterator压栈 RemoteIterator iter = fs_.listStatusIterator(fileStatus.getPath()); iters_.push(curIter_); curIter_ = iter; curFile_ = fileStatus; } @Override public FileStatus next() throws IOException { if (hasNext()) { FileStatus result = curFile_; // 重置回 null 以让 hasNext() 获取新的 FileStatus curFile_ = null; return result; } throw new NoSuchElementException("No more entries"); } }
出现死循环的代码在 FilterIterator
中,它也是一个RemoteIterator的实现,这个iterator主要封装过滤无关文件的逻辑,代码里最重要的是对 isInIgnoredDirectory()
的调用,以及那个catch FileNotFoundException:
static class FilterIterator implements RemoteIterator{ private final RemoteIterator baseIterator_; private FileStatus curFile_ = null; private final Path startPath_; FilterIterator(Path startPath, RemoteIterator baseIterator) { startPath_ = Preconditions.checkNotNull(startPath); baseIterator_ = Preconditions.checkNotNull(baseIterator); } @Override public boolean hasNext() throws IOException { while (curFile_ == null) { FileStatus next; try { if (!baseIterator_.hasNext()) return false; next = baseIterator_.next(); } catch (FileNotFoundException ex) { LOG.warn(ex.getMessage()); continue; } if (!isInIgnoredDirectory(startPath_, next)) { curFile_ = next; return true; } } return true; } @Override public FileStatus next() throws IOException { if (hasNext()) { FileStatus next = curFile_; curFile_ = null; return next; } throw new NoSuchElementException("No more entries"); } }
FilterIterator
要求使用的 iterator 是在抛 FileNotFoundException 后还是合法的,即还可以继续调 hasNext() 和 next(),直到 hasNext() 返回 false。这可不是 HDFS API 文档里有明确定义的!
在大部分情况下,FilterIterator
使用的就是前面那个 RecursingIterator
。当 RecursingIterator
使用的是 HDFS(即 DistributedFileSystem)生成的 iterator 时,是能满足上述性质的。但使用 GCS(即GoogleHadoopFileSystem)生成的 iterator 就不行了,区别在于它们抛 FileNotFoundException 的时机不同。用下面这段代码中的注释来说明:
iterator = fs.listStatusIterator(path); // 如果path不存在,HDFS在此处抛FileNotFoundExceptionwhile (iterator.hasNext()) { // 如果path不存在,GCS在此处抛FileNotFoundException process(iterator.next());}
区别就在于FileSystem#listStatusIterator这个API的实现,DistributedFileSystem会预拉取一些文件信息,因此会检查path是否存在,不存在就抛exception。而GCS没有实现这个API,使用的是FileSystem原生的实现,也就是只创建iterator对象(不发任何RPC),只有在hasNext()时发起RPC才知道path不存在,此时才抛exception。
这个区别为何致命呢?关键在 RecursingIterator#handleFileStat()
里,如果要递归下去的子目录实际已经被删了,HDFS会在对其调listStatusIterator()时抛exception,从而curIter_不会被改,而这个子目录的FileStatus也已经被读出了。下次 RecursingIterator#hasNext()
被调用时,就跳过这个不存在的子目录了。但在GCS里,listStatusIterator()没抛exception,curIter_就被换成子目录的iterator了,然后在hasNext()时才抛exception,无法改变任何状态,然后调用方FilterIterator不停调用hasNext() 进入死循环。
一个简单的修法就是封装一下对 fs.listStatusIterator() 的调用,每次得到iterator时都先调一次hasNext(),保证目录是存在的。这也是在GCS这个patch里采用的修法:
当然这只是让RecursingIterator
又满足了前述性质,实际上 FilterIterator
并不能依赖这样的性质。因此在 ,我们把这个问题更彻底地修了一下。具体见
总结一下这个bug:
另外, 在FileSystem里加了判断,如果目录不存在的话,listStatusIterator()会抛exception,也能避免这个bug,但需要hadoop 3.3.0, 3.2.2 及后续版本才有这个修改。在我开发GCS支持的时候,gcs-connector依赖的还只是3.2.1,所以才能发现这个bug。后来提了,现在gcs-connector已经把hadoop3依赖版本升级到了3.2.2,见
目前Impala只是基本支持了GCS,即基本功能有了,但还有一些优化没有做。比如
gsutil ls gs://mybucket/**
,它的代价只是返回的文件无序。我们在listing时并不需要什么顺序,所以这里可以优化。最后,欢迎熟悉其它存储系统的同学为Impala开发对应的支持功能。Impala使用Gerrit来做Code Review,新开发者可以参考这两个文档:
有问题也可以在社区中讨论,参见《》。
转载地址:http://ojzgf.baihongyu.com/