热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

HADOOP学习笔记(1)

ubuntu虚拟机,带有gitlab和jenkins等测试环境:http:pan.baidu.coms1jIhljyI密码:z39mHadoop说明Ha

ubuntu虚拟机,带有gitlab和jenkins等测试环境:http://pan.baidu.com/s/1jIhljyI密码: z39m

              

Hadoop说明

  Hadoop将帮助解决如下特殊问题:

Ø  使用Hadoop分布式文件系统(HDFS)来存储海量数据集,通过MapReduce对这些数据集运行分布式计算。

Ø  熟悉Hadoop的数据和I/O构建,用于压缩、数据集成、序列化和持久处理

Ø  洞悉编写MapReduce实际应用时的常见陷阱和高级特性

Ø  设计、构建和管理一个专用的Hadoop集群或在云上运行Hadoop

Ø  使用高级查询语言Pig来处理大规模数据

Ø  利用Hadoop数据库HBase来保存和处理结构化/半结构化数据

Ø  学会使用ZooKeeper来构建分布式系统


 

                 第一章 Hadoop概述

   Hadoop提供了一个稳定的共享存储和分析系统。存储由HDFS(Hadoop分布式文件管理系统)实现,分析有MapReduce实现。这是Hadoop的核心。MapReduce针对每个查询,每一个数据集(键/值  )(至少是很大一部分)都会被处理。MapReduce可以处理一批查询,并且它针对整个数据集处理。MapReduce很适合处理那些需要分析整个数据集的问题,以批处理的方式,尤其是Ad Hoc(自主或即时)分析。RDBMS适用于点查询和更新(其中,数据集已经被索引以提供延迟的检索和短时间的少量数据更新)。MapReduce适合数据被一次写入和多次读取的应用,而关系数据库更适合持续更新的数据集。

                                   

    MapReduce对于非结构化或半结构化数据非常有效,它被设计为处理时间内解释数据。MapReduce输入的键和值并不是数据固有的属性,它们是由分析数据源的人来选择的。

    MapReduce是一种线性的课伸缩的编程模式。程序员编写两个函数---map函数和Reduce函数----每一个都定义一个键/值对集映射到另一个。这些函数无视数据的大小或者它们正在使用的集群的特性,这样它们就可以原封不动地应用到小规模数据集或者大的数据集上。更重要的是,如果放入两倍的数据量,运行的时间会少于两倍。但是如果是两倍大小的集群,一个任务任然只是和原来的一样快。基于MapReduce的高级查询语言(如Pig和Hive)使MapReduce的系统更接近传统的数据库编程语言。

    高性能计算(High Performance Computing,HPC)和网格计算社区多年来一直在做大规模的数据处理,它们使用的是消息传递接口(MessagePassing Interface,MPI)这样的API。从广义上来讲,高性能计算的方法是将作业分配给一个机器集群,这些机器访问共享文件系统,由于一个存储区域网络(Storage Area Network,SAN)进行管理。这个非常适用于以主计算机密集型为主的作业,但当节点需要访问的大数据量时,这也成为一个问题,因为网络带宽成为“瓶颈”,所以计算机节点闲置下来了。

    MapReduce尝试在计算节点本地存储数据,因此数据访问速度回因为它是本地数据而比较快。这项“数据本地化”功能,成为MapReduce的核心功能并且也是它拥有良好性能的原因之一。MapReduce检测失败的map或者reduce任务并且在健康的机器上重现安排任务。MapReduce能够做到这一点,得益于它是一个无共享的架构,这也意味着各任务之间彼此并不依赖。(mapper的输出是反馈给reducer的,但这通过MapReduce系统控制。在这种情况下,相当于返回失败的map,应该对返回reducer给予更多关注,因为它必须确保它可以检束到必要的map输出,如果不行,必须重新运行相关的map从而生成必要的这些输出。)

    MapReduce限定于键/值对的类型,mapper和reducer彼此间的协作有限,一个接一个地运行(mapper传输键/值对给reducer)。MapReduce作为一个建立搜索索引产品系统。MapReduce被设计为用来运行那些需要数分钟或数小时的作业,这些作业在一个聚集带宽很高的数据中心中可信任的专用硬件设备上运行。

1.1 Hadoop子项目

         Hadoop最出名的是MapReduce及其分布式文件系统(HDFS),还有其他子项目提供配套服务,其他子项目提供补充性服务。这些子项目的简要描述如下,其他技术栈如下图:

                                                     

         Core:一系列分布式文件系统和通用I/O的组件和接口(序列化、JavaRPC和持久化数据结构)。

         Avro:一种提供高效、跨语言RPC的数据序列系统,持久化数据存储。

         MapReduce:分布式数据处理模式和执行环境,运行与大型商用机集群。

         HDFS:分布式文件系统,运行于大型商用机集群。

         Pig:一种数据流语言和运行环境,用于检索非常大的数据集。Pig运行在MapReduce和HDFS的集群上。

         Hbase:一个分布式的、列存储数据库。HBase使用HDFS作为底层存储,同时支持MapReduce的批量式计算和点查询(随机读取)。

         ZooKeeper:一个分布式的、高可用性的协调服务。ZooKeeper提供分布式锁之类的基本服务用于构建分布式应用。

         Hive:分布式数据仓库。Hive管理HDFS中存储的数据,并提供基于SQL的查询语言(由运行时引擎翻译长MapReduce作业)用于查询数据。

        Chukwa:分布式数据收集和分析系统。Chukwa运行HDFS中存储数据的收集器,它使用MapReduce来生成报告。






第2章 MapReduce简介

 

         MapReduce是一个用于数据处理的编程模型。同一个程序Hadoop可以运行于用各种语言编写的MapReduce程序上。MapReduce程序本质上是并行的,因此可以将大规模的数据分析交给任何一个拥有足够多机器的运营商。MapReduce的优势在与处理大型数据集。

2.1 一个气象数据集

         在这个例子,要编写一个挖掘气象数据的程序。分布在全球各地的气象传感器每隔一个小时便收集当地的气象数据,从而积累了大量的日志数据。这些日志数据适合用MapReduce进行分析的最佳候选,它们是半结构化且面向记录的数据。

2.1.1 数据格式

         我们将使用NCDC(国家气候数据中心)提供的数据。数据是以面向行的ASCII格式存储的,每一行便是一个记录。改格式支持许多气象元素,其中许多数据是可选的或长度可变的。为简单起见,将重点讨论基本元素(如气温),这些数据是始终都有且有固定宽带的。

         如下一些国家气候数据中心数据记录的格式,该行已被分成多行以显示出每个字段,在实际文件中,字段被整合成一行且没有任何分隔符。

                                                     

        

         数据文件按照日期和气象站进行组织。从1901年到2001年,每一年都有一个目录,每一个目录都包括一个打包文件,文件中的每一个气象站都带有当年的数据。例如,1990年的前面的数据项如下:

                                                    

2.2使用Unix Tools来分析数据

         对于面向行的数据,传统的处理工具是awk。下面有一个小程序脚本,用于计算每年的最高气温。

#! /usr/bin/env bash
for year in all/*
doecho -ne 'basename $year .gz' "\t"gunzip -c $year | \awk '{ temp=substr($0,88,5)+0;q=substr($0,93,1);if(temp!=9999 && q ~ /[01459]/ && temp>max) max=temp}END {print max}'
done

         该脚本循环遍历压缩文件,首先显示年份,然后使用awk处理每个文件。awk脚本从数据中提取两个字段:气温和质量代码。气温值通过加上一个0变成一个整数。接下来,执行测试,从而判断气温值是否有效,质量代码显示的读数是有疑问还是根本就是错误的。如果读数是正确的,那么该值将与目前看到的最大值进行比较,如果该值比原先的最大值大,就替换掉目前的最大值。当文件中所有的行都已处理完毕并打印出最大值后,END块中的代码才会被执行。

         为完成跨越一世纪这么长时间的查找,程序在EC2 High-CPU Extra Large Instance机器上运行了42分钟。

 

 

2.3 使用Hadoop进行数据分析

         为了更好地发挥Hadoop提供的并行处理机制的优势,必须把查询表示成MapReduce作业。经过一些本地的小规模测试,将能够在在机器集群上运行它。

2.3.1 map和reduce

         MapReduce的工作过程分为两个阶段:map阶段和reduce阶段。每个阶段都有键/值对作为输入和输出,并且它们的类型可由程序员选择。程序员还具体定义了两个函数:map函数和reduce函数。

         map阶段输入的是原始的数据,选择的是一种文本输入格式,以便数据集的每一行都会是一个文本值。键是在文件开头部分文本行起始处的偏移量。map函数很简单,通过map函数来找出年份和气温。在本例中,map函数只是一个数据准备阶段,通过这种方式来建立数据,使得reducer函数能在此基础上进行工作:找出每年的最高气温。map函数也是很适合去除已损记录的地方:在这里,我们将筛选掉缺失的、不可靠的或错误的气温数据。

         为了全面了解map的工作方式,思考下面几行示例的输入数据(考虑到页面篇幅,一些未使用的列已被去除,用省略号表示):

                      

         这些键/值对的方式来表示map函数:

                     

         键是文件中的行偏移量,而往往是我们在map函数中所忽视的。map函数的功能仅仅提取年份和气温(以粗体显示),并将其作为输出被发送。(气温值已被解释为整数)

                                 

         map函数的输出先由MapReduce框架处理,然后再被发送到reduce函数。这一处理过程根据键/值对进行排序和分组。因此,reduce函数会看到如下输入:

                      

         每年的年份后都有一系列气温读数。所有reduce函数现在必须重复这个列表并从中找出最大的读数:

                 

         这是最后的输出:全年气温记录中每年的最高气温。

         这个数据流下图所示。在图的底部是Unix的管道,模拟整个MapReduce的流程:

                      

        

2.3.2 Java MapReduce

         了解MapReduce程序的工作原理之后,下一步就是要用代码来实现它。需要有三样东西:一个map函数、一个reduce函数和一些来运行作业的代码。map函数是由一个Mapper接口来实现,其中声明了一个map()方法。map函数的实现:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureMapper extends MapReduceBaseimplements Mapper
{private static final int MISSING=9999;public void map(LongWritable key,Text value,OutputCollector output,Reporter reporter)throw IOException{String line=value.toString();String year=line.substring(15,19);if(line.charAt(87)=='+'){airTemperture=Integer.parseInt(line.substring(88,92));}else{airTemperture=Integer.parseInt(line.substring(87,92));}String quality=line.substring(92,93);if(airTemperture !=MISSING && quality.matches("[01459]")){output.collect(new Text(year),new IntWritable(airTemperture));}}
}      

         Mapper接口是一个泛型类型,它有4个形式参数类型,由它们来指定map函数的输入键、输入值、输出键和输出值得类型。就上述代码而言,输入键是一个长整数偏移量,输入的值是一行文本,输出的键是年份,输出的值是气温(整数)。Hadoop规定了一套可用于网络序列优化的基本类型,而不是使用内置的Java类型。这些都可以在importorg.apache.hadoop.io中找到。LongWritable类型相当于Java中的Long类型,Text类型相当于Java的String类型和IntWritable类型相当于Java的Integer类型。

         map()方法需要传入一个键和一个值。将一个包含Java字符串输入行的Text值转换成Java的String类型,然后利用其substring()方法提取的相应的列。

         map()方法还提供一个OutputCollector实例来写输入输出内容。在这种情况下,写入年份作为一个Text对象(只使用一个键),用IntWritable类型包装气温值。只有在气温显示出来后并且它的质量代码表示的是正确的气温读数时才能写入输出记录。

         reduce函数同样使用Reducer时被定义,最高气温示例的Reducer:  

import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;public class MaxTemperatureReducer extends MapReduceBaseimplements Reducer
{public void reduce(Text key,Iterator values,OutputCollector output,Reporter reporter)throw IOException{int maxValue=Integer.MIN_VALUE;while(values.hasNext()){maxValue=Math.max(maxValue,values.next().get());}output.collect(key,new IntWritable(maxValue));}
}    

         四个形式参数类型用于指定reduce函数的输入和输出类型。reduce函数的输入类型必须与map函数的输出类型相匹配:Text类型和IntWritable类型。在这中情况下,reduce函数的输出类型是Text和IntWritable这两种类型,前者是年份的类型而后者是最高气温的类型,在这些输入类型之中,遍历所有气温,并把每个记录进行比较知道找到一个最高的为止。

第三部分代码运行的是MapReduce作业,在气象数据集中找出最高气温的应用程序:

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;public class MaxTemperture{public static void main(String [] args) throws IOException{if(args.length !=2){System.err.println("Usage:MaxTemperture{ ");System.exit(-1);}JobConf conf=new JobConf(MaxTemperture.class);conf.setJobName("Max temperture");FileInputFormat.addInputPath(conf,new Path(args[0]));FileOutputFormat.setOutputPath(conf,new Path(args[1]));conf.setMapperClass(MaxTempertureMapper.class);conf.setReducerClass(MaxTempertureReducer.class);conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);JobClient.runJob(conf);}
}


        对象指定了作业的各种参数。它授予用户对整个代码如何运行的控制权。当用户在Hadoop集群上运行这个作业时,用户把代码打包成一个JAR文件(Hadoop会在集群分发这个包)。用户没有明确指定JAR文件的名称,而是在JobConf构造函数中传送一个类,Hadoop会找到这个包含此类的JAR文件。

         在创建JobConf对象后,用户将指定输入好输出的路径。通过调用FileInputFormat内的静态方法addInputPath()来定义输入的路径,它可以是单个文件、目录或文件模式的路径。同时,addInputPath()可以被调用多次从而实现使用多路径输入。

         输出路径(其中只有一个)是在FileOutputFormat内的静态方法setOutputPath()来指定。它指定了reduce函数输出文件写入的目录。在运行作业前该目录不应该存在,否则Hadoop会报错并且拒绝运行任务。这种预防措施是为了防止数据丢失。

         接下类,通过setMapperClass()和setReducerClass()这两个方法来指定要使用的map和reduce类型。

         setOutputKeyClass()和setOutputValueClass()方法控制map和reduce函数的输出类型,正如本例所示,这两个方法往往是相同的。如果它们不同,那么map的输出类型可设置成使用setMapOutputKeyClass()和setMapOutputValueClass()方法。输入的类型通过输入格式来控制,用户没有设置,因为用户使用的是默认的TextInputFormat(文本输入格式)。在设置了定义map和reduce函数的类之后,运行作业的准备工作就算完成了。JobClient内的静态方法runJob()会提交作业并等待它完成,把进展情况写入控制台。

         写完MapReduce作业之后,拿一个小型的数据集进行测试以排除与代码直接有关的问题。首先,以独立模式安装Hadoop。在这种模式写,Hadoop运行中使用本地带job runner(作业运行程序)的文件系统。用前面讨论过的五行代码的例子来测试(命令):

% export HADOOP_CLASSPATH=build/classes
% hadoop MaxTemperature input/ncdc/sample.txt output

         如果Hadoop命令是以类名作为第一个参数,它会启动一个JVM来运行这个类。使用命令比直接使用Java更方便,因为前者把类的路径(及其依赖关系)加入Hadoop的库中,并获得Hadoop的配置。要添加应用程序类的路径,用户需要定义一个环境变量,Hadoop脚本会来执行相关操作。

注意:以本地(独立)模式运行时,笔记中所有程序希望都以这种方式来设置HADOOP_CLA-

SSPATH。命令必须在示例代码所在的文件夹下被运行。

         运行作业所得到的输出提供了一些有用的信息。(无法找到作业JAR文件相关信息是意料之中的,因为是在本地模式下没有JAR的情况下运行的。在集群上运行时,不会看到此警告。)输出的最后一部分叫“计数器”(Counter),显示了在Hadoop上运行的每个作业产生的统计信息。这些对检查处理的数据量是否符合预期非常有用。例如,可以遵循整个系统中记录的数目:5个map输入产生了5个map的输出,然后5个reduce输入产生两个reduce输出。

         输出被写入到output目录,其中每个reducer包括一个输出文件。作业包含一个reducer,所以只能找到一个文件名,名为part-0000:

           

         这个结果和之前手动搜索的结果一样。可以把这个结果解释为在1949年的最高气温记录为11.1摄氏度,而在1950年为2.2摄氏度。


推荐阅读
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 大数据领域的职业路径与角色解析
    本文将深入探讨大数据领域的各种职业和工作角色,帮助读者全面了解大数据行业的需求、市场趋势,以及从入门到高级专业人士的职业发展路径。文章还将详细介绍不同公司对大数据人才的需求,并解析各岗位的具体职责、所需技能和经验。 ... [详细]
  • 本文介绍了如何使用Hive分析用户最长连续登录天数的方法。首先对数据进行排序,然后计算相邻日期之间的差值,接着按用户ID分组并累加连续登录天数,最后求出每个用户的最大连续登录天数。此外,还探讨了该方法在其他领域的应用,如股票市场中最大连续涨停天数的分析。 ... [详细]
  • 我的读书清单(持续更新)201705311.《一千零一夜》2006(四五年级)2.《中华上下五千年》2008(初一)3.《鲁滨孙漂流记》2008(初二)4.《钢铁是怎样炼成的》20 ... [详细]
  • 本文介绍如何通过整合SparkSQL与Hive来构建高效的用户画像环境,提高数据处理速度和查询效率。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 本文深入探讨了NoSQL数据库的四大主要类型:键值对存储、文档存储、列式存储和图数据库。NoSQL(Not Only SQL)是指一系列非关系型数据库系统,它们不依赖于固定模式的数据存储方式,能够灵活处理大规模、高并发的数据需求。键值对存储适用于简单的数据结构;文档存储支持复杂的数据对象;列式存储优化了大数据量的读写性能;而图数据库则擅长处理复杂的关系网络。每种类型的NoSQL数据库都有其独特的优势和应用场景,本文将详细分析它们的特点及应用实例。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 【漫画解析】数据已删,存储空间为何未减?揭秘背后真相
    在数据迁移过程中,即使删除了原有数据,存储空间却未必会相应减少。本文通过漫画形式解析了这一现象背后的真相。具体来说,使用 `mysqldump` 命令进行数据导出时,该工具作为 MySQL 的逻辑备份工具,通过连接数据库并查询所需数据,将其转换为 SQL 语句。然而,这种操作并不会立即释放存储空间,因为数据库系统可能保留了已删除数据的碎片信息。文章进一步探讨了如何优化存储管理,以确保数据删除后能够有效回收存储空间。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 在Linux系统中,原本已安装了多个版本的Python 2,并且还安装了Anaconda,其中包含了Python 3。本文详细介绍了如何通过配置环境变量,使系统默认使用指定版本的Python,以便在不同版本之间轻松切换。此外,文章还提供了具体的实践步骤和注意事项,帮助用户高效地管理和使用不同版本的Python环境。 ... [详细]
  • 分布式一致性算法:Paxos 的企业级实战
    一、简介首先我们这个平台是ES专题技术的分享平台,众所周知,ES是一个典型的分布式系统。在工作和学习中,我们可能都已经接触和学习过多种不同的分布式系统了,各 ... [详细]
  • 通过使用Sqoop导入工具,可以精确控制并高效地将表数据的特定子集导入到HDFS中。具体而言,可以通过在导入命令中添加WHERE子句来指定所需的数据范围,从而在数据库服务器上执行相应的SQL查询,并将查询结果高效地存储到HDFS中。这种方法不仅提高了数据导入的灵活性,还确保了数据的准确性和完整性。 ... [详细]
author-avatar
歌手王紫璇
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有