本节书摘来异步社区《Spark Cookbook 中文版》一书中的第1章,第1.8节,作者: 【印度】Rishi Yadav(亚达夫)译者: 顾星竹 , 刘见康 责编: 胡俊英,更多章节内容可以访问云栖社区“异步社区”公众号查看。
1.8 使用Tachyon作为堆外存储层
Spark弹性分布式数据集(RDD)很适合在内存上存储数据集,可以在不同应用上存储大量备份。Tachyon可以解决Spark RDD管理的一些挑战性问题,如下所示。
RDD仅存在于Spark应用期间。
计算程序和RDD内存存储共享同样的执行过程;所以,如果一个进程崩溃了,那么内存存储也会消失。
即使处理同样的底层数据,不同作业的RDD是不能共享的,例如使用HDFS数据块。
慢速写入磁盘。
在内存中备份数据,更高的内存占用。
如果需要与其他应用程序共享输出,由于需要磁盘复制速度会非常慢。
Tachyon提供了堆外存储层来解决这些问题。该层(即堆外存储层)不受进程崩溃的影响也不会被垃圾回收器标记,同时也可以让RDD独立于特定的作业或对话之上实现跨应用共享。本质上,数据的一个存储在内存上的单一副本如图1-13所示。
1.8.1 具体步骤
1.下载并编译Tachyon(在默认情况下Tachyon配置的Hadoop版本为1.0.4,所以需要从源代码编译,选择正确的Hadoop版本)。替换当前版本,本书所写是版本为0.6.4。
$ wget https://github.com/amplab/tachyon/archive/v.zip
2.解压源代码。
$ unzip v-.zip
3.为了方便,重命名Tachyon源文件名。
$ mv tachyon- tachyon
4.修改Tachyon文件夹目录。
$ cd tachyon $ mvn -Dhadoop.version=2.4.0 clean package -DskipTests=true$ cdconf$ sudo mkdir -p /var/tachyon/journal$ sudo chown -R hduser:hduser /var/tachyon/journal$ sudo mkdir -p /var/tachyon/ramdisk$ sudo chown -R hduser:hduser /var/tachyon/ramdisk$ mv tachyon-env.sh.template tachyon-env.sh$ vi tachyon-env.sh
5.注释下面这行。
export TACHYON_UNDERFS_ADDRESS=$TACHYON_HOME/underfs
6.去掉下面这行前面的注释。
export TACHYON_UNDERFS_ADDRESS=hdfs://localhost:9000
7.修改以下属性。
-Dtachyon.master.journal.folder=/var/tachyon/journal/export TACHYON_RAM_FOLDER=/var/tachyon/ramdisk $ sudo mkdir -p /var/log/tachyon$ sudo chown -R hduser:hduser /var/log/tachyon$ vi log4j.properties
8.用/var/log/tachyon替换${tachyon.home}。
9.在conf目录下创建新文件core-site.xml。
$ sudo vi core-site.xmlfs.tachyon.impltachyon.hadoop.TFS$ cd ~$ sudo mv tachyon /opt/infoobjects/$ sudochown -R root:root /opt/infoobjects/tachyon$ sudochmod -R 755 /opt/infoobjects/tachyon
10.将/bin加入路径。
$ echo "export PATH=$PATH:/opt/infoobjects/tachyon/bin" >> /home/
hduser/.bashrc
11.重启shell并格式化Tachyon。
$ tachyon format
$ tachyon-start.sh local //you need to enter root password as
RamFS needs to be formatted
Tachyon的网页端口是http://hostname:19998,如图1-14所示。
12.运行以下程序观测Tachyon是否运行良好,如图1-15所示。
13.以下命令可以随时停止Tachyon。
$ tachyon-stop.sh
14.在Spark上运行Tachyon。
$ spark-shellscala> val words = sc.textFile("tachyon://localhost:19998/words")scala> words.countscala> words.saveAsTextFile("tachyon://localhost:19998/w2")scala> val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")scala> import org.apache.spark.api.java._scala> person.persist(StorageLevels.OFF_HEAP)