1、Hadoop 最早起源于Nutch。Nutch的设计目标是构建一个大型的全网搜索引擎,包括网页抓取、索
引、查询等功能,但随着抓取网页数量的增加,遇到了严重的可扩展性问题 ——如何解决数十亿网页的
存储和索引问题
2、2003 -2004年谷歌发表的三篇论文为该问题提供了可行的解决方案
3、Nutch的开发人员完成了相应的开源实现HDFS 和 MapReduce,并从Nutch中剥离成为独立项目
Hadoop,到 2008 年 1 月,Hadoop 成为Apache 顶级项目,迎来了它的快速发展期。
1、Hadoop 是 Apache 旗下的一套开源软件平台
2、Hadoop 提供的功能:利用服务器集群,根据用户的自定义业务逻辑,对海量数据进行分布式处理
存储+运算
3、Hadoop 的核心组件有:
4、广义上来说,Hadoop 通常是指一个更广泛的概念–Hadoop 生态圈
1、创建一个wcinput文件夹,注意是在hdfs上面创建,不是在本地创建
2、在本地任意文件夹下面创建文件并上传到hdfs上面,我这里是在本地的/home/test目录下面创建的wordcount.txt
3、任意目录下运行官方wordcount程序
[bigdata@bigdata02 test]$ hadoop jar /home/bigdata/apps/hadoop-2.7.7/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar wordcount /wcinput /wcoutput
4、去hdfs上面查看
[bigdata@bigdata02 test]$ hadoop jar /home/bigdata/apps/hadoop-2.7.7/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar pi 5 5
注:
Number of Maps = 5
Samples per Map = 5
结果:
1、如果一个文件中有10个数值(一行一个,并且都是可以用int来度量), 现在求10个数值的和。
思路:
(1)逐行读取文件的内容
(2)把读取到的内容转化成int类型
(3)把转换之后的数据进行相加
(4)输出最后的一个累加和
2、假如,这样的文件有很大一堆, 并且每个文件都很大,而且每个文件里面的内容都很多
例如:现在有10000个文件,每个文件2T,文件里面的内容依然是每行一个数值,要求这一堆文件的所有数值的和。
如果我们使用单进程的程序来执行计算,最终可能也可以计算出来结果,但是效率很低。
一行一行读取, 串行
改进方案:
将串行 改成并行
分布式计算:
1、第一个阶段:先帮大的任务切分成小的任务,然后将集群中的每个节点都可以对这些小的任务来进行计算。
2、第二个节段:将之前的中间性的结果进行汇总。
3、又有个问题:该10000个2T的文件应该怎么分布, 才能让这10000个任务的执行效率达到最高?
如果集群有10000个节点,如果每个节点上面都存放了一个文件,然后对每个节点上面的数据启动我们的计算引擎进行任务的计算,这样效率高。
计算在a节点,存储的数据在b节点。效率不高。
数据传输
4、数据的处理:存储 和 计算 是怎么设计的?
存储和计算 相互依赖的。
在设计存储的时候,必须要考虑计算的问题
在设计计算的时候,必须要考虑存储的问题
存储: HDFS
计算: MapReduce
HDFS的设计思想:
帮存入到HDFS集群的数据均匀分散的存储到整个集群中。
举个栗子:
100g数据,集群有100个节点,按照1g的大小进行切分存储,每个节点要存储1g的数据量。
100g数据,集群有90个节点,按照1g的大小进行切分存储,其中90台服务器中有10台要存储2g的数据,有80台要存储1g的数据
假设1g的数据需要1秒的运算时间,那么整个任务需要2秒钟
100g数据,集群有90个节点,按照512m的大小进行切分存储,其中90台服务器中有20台要存储1.5g的数据,有70台要存储1g的数据
假设1g的数据需要1秒的运算时间,那么整个任务需要1.5秒钟
仅仅从上面的这个案例当中,得出下面的小结论:我们切分的块是不是越小越好
但是有问题:小文件很多就有问题
access.log 100g
block0 50g
block1 50g
access.log 100g
block0 20g
block1 20g
block2 20g
block3 20g
block4 20g
对于用户来说,一个文件完整的存储到hdfs进来的。所以用户再去下载这个文件的时候,要的完整的文件整体。要给所有的块合并回来,并且顺序不能错。
块越少,拼接越容易。
仅仅从上面的这个案例当中,得出下面的小结论:我们切分的块是不是越大越好
传统的中庸思想 不大不小最好
不大不小:HDFS在设计的时候考虑到了不同的应用场景,在每个不同的应用场景中,有可能需要的块的大小不一样。可以配置。
HDFS集群的块的大小,可以自己配置。
但是有默认的大小:
hadoop2.x版本以前,默认的块的大小:64m
hadoop2.x版本(含)以后,默认的块的大小:128m
让大数据能够存储到hdfs集群,并且考虑计算的效率问题,让文件切分存储,并且让这些块均匀分散的存储到整个集群中。
多节点存储
HDFS集群,理论上,可以无限制的加节点。
但是。加到一定的时候有上限。
1、HDFS集群是主从架构
namenode
2、加的机器的性能一般。
数据安全。
5、HDFS是怎么保障数据的安全的?
可以根据用户的配置配置多份。
配置多份。
hadoop0 hadoop1 hadoop2
多份数据的分布有原则的:
1、数据的备份的数量有用户指定。
2、如果一个文件存储多份,这多份数据完全没有必要在一个节点上面。
小问题:如果集群有三个存储节点,但是要求数据存储4份。
HDFS上面最终有几份数据?3份。
结论:hdfs集群中的任何一个节点,肯定没有完全相同的两份数据。
6、HDFS核心思想
1、一个大的文件要存储,必须要借助于分布式的存储系统
将大文件进行分而治之
分散存储
2、整个HDFS集群架设在不是特别牢靠的服务器之上,所以要保证数据安全。
采用副本的策略
冗余备份
备份的数据默认值 3
配置文件路径:/software/hadoop/etc/hadoop/hdfs-site.xml
HDFS 被设计成用来使用低廉的服务器来进行海量数据的存储,那是怎么做到的呢?
1、大文件被切割成小文件,使用分而治之的思想对同一个文件进行管理
2、每个切分之后的块都进行冗余存储,高可用不丢失
1、namenode:掌管文件系统的目录树,处理客户端的请求,保存元数据信息
2、datanode:存储实际的数据的,处理真正的读写
3、secondnamenode:分担namenode压力的,协助合并元数据信息
优点:
1、可构建在廉价机器上
通过多个副本来提高可靠性,文件切分多个块进行存储
2、高容错性
数据自动保存多个副本,副本丢失后,可以自动恢复
3、适合批处理
移动计算比移动数据方便
4、适合大数据处理
10k+节点规模
5、流式文件访问
一次写入,多次读取,可以保证数据的一致性
缺点:
不适于以下操作
1、要求高的数据访问
比如毫秒级
2、小文件存取
寻道时间超过读取时间
3、并发写入、文件随机修改
一个文件只能有一个写
仅仅支持追加
4、不适合存储小文件
存储一个1亿个小文件,大小仅仅1t,但是消耗掉20g左右的内存
-help
功能:输出这个命令参数手册
[root@hadoop0 software]# hadoop fs -help
-ls
功能:显示目录信息
示例: hadoop fs -ls hdfs://hadoop0:9000/
备注:这些参数中,所有的 hdfs 路径都可以简写成 hadoop fs -ls / 等同上条命令的效果
-put
功能:等同于 copyFromLocal,进行文件上传
示例:hadoop fs -put /aaa/jdk.tar.gz /bbb/jdk.tar.gz
-get
功能:等同于 copyToLocal,就是从 hdfs 下载文件到本地
示例:hadoop fs -get /aaa/jdk.tar.gz
-cp
功能:从 hdfs 的一个路径拷贝到 hdfs 的另一个路径
示例: hadoop fs -cp /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2
-mv
功能:在 hdfs 目录中移动文件
示例: hadoop fs -mv /aaa/jdk.tar.gz /
-rm
功能:删除文件或文件夹
示例:hadoop fs -rm -r /aaa/bbb/
-rmdir
功能:删除空目录
示例:hadoop fs -rmdir /aaa/bbb/ccc
-moveFromLocal
功能:从本地剪切到 hdfs
示例:hadoop fs - moveFromLocal /home/hadoop/a.txt /aa/bb/cc/dd
-moveToLocal
功能:从 hdfs 剪切到本地
示例:hadoop fs - moveToLocal /aa/bb/cc/dd /home/hadoop/a.txt
-copyFromLocal
功能:从本地文件系统中拷贝文件到 hdfs 文件系统去
示例:hadoop fs -copyFromLocal ./jdk.tar.gz /aaa/
-copyToLocal
功能:从 hdfs 拷贝到本地
示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz
-appendToFile
功能:追加一个文件到已经存在的文件末尾
示例:hadoop fs -appendToFile ./hello.txt hdfs://hadoop0:9000/hello.txt
可以简写为:
hadoop fs -appendToFile ./hello.txt /hello.txt
-cat
功能:显示文件内容
hadoop fs -cat /hello.txt
-tail
功能:显示一个文件的末尾
示例:hadoop fs -tail /weblog/access_log.1
-text
功能:以字符形式打印一个文件的内容
示例:hadoop fs -text /weblog/access_log.1
-chgrp
-chmod
-chown
功能:linux 文件系统中的用法一样,对文件所属权限
示例:
hadoop fs -chmod 666 /hello.txt
hadoop fs -chown someuser:somegrp /hello.txt
-df
功能:统计文件系统的可用空间信息
示例:hadoop fs -df -h /
-du
功能:统计文件夹的大小信息
示例:hadoop fs -du -s -h /aaa/*
-count
功能:统计一个指定目录下的文件节点数量
示例:hadoop fs -count /aaa/
-setrep
功能:设置 hdfs 中文件的副本数量
示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz
查看 dfs 集群工作状态的命令:
hdfs dfsadmin -report
获取hdfs端口号:
hdfs getconf -confKey fs.default.name
注:
这里使用的是hadoop2.7.7版本
public void getFileSystem01() throws IOException {
//1.创建Configuration对象
Configuration cOnf= new Configuration();
//2.设置文件系统类型
conf.set("fs.defaultFS", "hdfs://bigdata02:8020");
//3.获取指定文件系统fileSystem;
FileSystem fileSystem = FileSystem.get(conf);
//4.输出测试
System.out.println(fileSystem);
}
(2)方式2:set方式+newInstance
public void getFileSystem02() throws IOException {
//1.创建Configuration对象
Configuration cOnf= new Configuration();
//2.设置文件系统类型
conf.set("fs.defaultFS", "hdfs://bigdata02:8020");
//3.获取指定文件系统fileSystem;
FileSystem fileSystem = FileSystem.newInstance(conf);
//4.输出测试
System.out.println(fileSystem);
}
(3)方式3:new URI+get
public void getFileSystem03() throws URISyntaxException, IOException {
FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata02:8020"), new Configuration());
System.out.println(fileSystem);
}
(4)方式4:new URI+newInstance
public void getFileSystem04() throws URISyntaxException, IOException {
FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://bigdata02:8020"), new Configuration());
System.out.println(fileSystem);
}
public void listFiles() throws Exception {
//1.获取FileSystem实例
FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata02:9000"), new Configuration());
//2、调用方法listFiles 获取/目录下的所有文件信息
RemoteIterator iterator = fileSystem.listFiles(new Path("/"), true);
//3、遍历迭代器
while (iterator.hasNext()) {
LocatedFileStatus fileStatus = iterator.next();
//获取文件的绝对路径:hdfs://bigdata:8020/xxx
System.out.println(fileStatus.getPath() +
"------------------" + fileStatus.getPath().getName());
//文件的block信息
BlockLocation[] blockLocatiOns= fileStatus.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println("主机为:"+host);
}
}
System.out.println("block数量为:"+blockLocations.length);
}
}
public void mkdirs() throws Exception {
//1、获取FileSystem实例
FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata02:9000"), new Configuration(), "bigdata");
//2、创建文件夹
boolean bl = fileSystem.mkdirs(new Path("guxiaohao/bbb/ccc"));
fileSystem.create(new Path("guxiaohao/bbb/ccc/a.txt"));
fileSystem.create(new Path("guxiaohao2/bbb/ccc/a.txt"));
System.out.println(bl);
//3、关闭FileSystem
fileSystem.close();
}
public void uploadFile() throws Exception {
//1.获取FileSystem
FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata02:9000"), new Configuration(), "bigdata");
//2、调用方法实现上传
fileSystem.copyFromLocalFile(new Path("G:\bigdata\bigdataDir\test3.txt"), new Path("/"));
//3、关闭FileSystem
fileSystem.close();
}
public void downloadFile1() throws Exception {
//1.获取FileSystem
FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata02:9000"), new Configuration());
//2、调用方法实现下载
//注意设置false和true两个参数,否则IDEA运行报错:
//(null) entry in command string: null chmod 0644 G:bigdatabigdataDirtest_download.txt
fileSystem.copyToLocalFile(false, new Path("/test.txt"), new Path("G:\bigdata\bigdataDir\test_download.txt"), true);
//3、关闭FileSystem
fileSystem.close();
}
(2)通过输入输出流
public void downloadFile2() throws Exception {
//1.获取FileSystem
FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata02:9000"), new Configuration());
//2、获取hdfs输入流
FSDataInputStream inputStream = fileSystem.open(new Path("/test.txt"));
//3、获取本地路径输出流
FileOutputStream outputStream = new FileOutputStream("G:\bigdata\bigdataDir\test_download2.txt");
//4、文件的拷贝
IOUtils.copy(inputStream, outputStream);
//5、关闭流
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
fileSystem.close();
}
在根目录下面新建一个文件夹
hadoop fs -mkdir -p /test
将test1.txt和test2.txt移动到/test文件夹下面
[bigdata@bigdata02 ~]# hadoop fs -mv /test1.txt /test
[bigdata@bigdata02 ~]# hadoop fs -mv /test2.txt /test
通过getmerge命令合并下载文件到当前目录下面:
[bigdata@bigdata02 ~]# hadoop fs -getmerge /test/*.txt ./test_merge.txt
[bigdata@bigdata02 ~]# ll
total 8
-rw-r--r--. 1 root root 36 Jun 24 19:50 test2.txt
-rw-r--r--. 1 root root 74 Jun 24 20:34 test_merge.txt
[bigdata@bigdata02 ~]# cat test_merge.txt
hello world
hello hadoop
hello hbasehello world
hello hive
hello flume
public void mergeFileDownload() throws Exception {
//1.获取FileSystem
FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata02:9000"), new Configuration());
//2、获取本地文件系统
LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration());
//3、获取本地路径输出流
FileOutputStream outputStream = new FileOutputStream("G:\bigdata\bigdataDir\output_down.txt");
//4、获取hdfs下所有的文件详情
FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/"));
//5、遍历每个文件,获取每个文件的输入流,将输入流转为byte数组,写入(追加)输出流
for (FileStatus fileStatus : fileStatuses) {
//6、获取hdfs文件输入流
FSDataInputStream inputStream = fileSystem.open(fileStatus.getPath());
//7、将输入流转为byte数组,写入(追加)输出流
byte[] bt = new byte[1024];
int n = 0;
while (-1 != (n = inputStream.read(bt))) {
outputStream.write(bt, 0, n);
}
//8、关闭输入流·
IOUtils.closeQuietly(inputStream);
}
//9、关闭流
IOUtils.closeQuietly(outputStream);
localFileSystem.close();
fileSystem.close();
}
public void mergeFileUpload() throws Exception {
//1.获取FileSystem
FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata02:9000"), new Configuration(), "bigdata");
//2、获取hdfs大文件输出流
FSDataOutputStream outputStream = fileSystem.create(new Path("/test.txt"));
//3、获取一个本地文件系统
LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration());
//4、获取本地文件夹下所有的文件详情
FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("G:\bigdata\bigdataDir\input"));
//5、遍历每个文件,获取每个文件的输入流
for (FileStatus fileStatus : fileStatuses) {
FSDataInputStream inputStream = localFileSystem.open(fileStatus.getPath());
//6、将小文件的数据复制到大文件
IOUtils.copy(inputStream, outputStream);
IOUtils.closeQuietly(inputStream);
}
//7、关闭流
IOUtils.closeQuietly(outputStream);
localFileSystem.close();
fileSystem.close();
}