热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark从入门到放弃一:worldcountjava

文章地址:http:www.haha174.toparticledetails253584项目源码:https:github.comhaha174spa

文章地址:http://www.haha174.top/article/details/253584
项目源码:https://github.com/haha174/spark.git
开启spark 从入门到放弃/笑哭。下面不多说来写一个hello world 压压惊。
之前搭建集群用的是 spark 2.2 hadoop 2.9
所以开发的需要引入如下的依赖

<properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding><spark.version>2.2.1spark.version><java.version>1.8java.version><hadoop.version>2.9.0hadoop.version>properties><dependencies><dependency><groupId>junitgroupId><artifactId>junitartifactId><version>4.12version><scope>testscope>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-core_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-sql_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-hive_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.hadoopgroupId><artifactId>hadoop-clientartifactId><version>${hadoop.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming-kafka_2.11artifactId><version>1.6.2version>dependency>dependencies>

/*** 第一步 &#xff0c;创建SparkConf* setMaster 设置 集群 master的url 如果设置为local 表示在本地运行*/SparkConf conf&#61;new SparkConf().setAppName("WorldCountLocal").setMaster("local");

/**
* 第二步 创建SparkContext 对象
* 在spark 中SparkContext 是spark 所有功能的入口 无论使用的是java scala 甚至py 编写都必须有一个SparkContext
* 它的主要作用包括初始化spark 应用程序所需要的一些核心组件&#xff0c;包括调度器&#xff08;DAGSchedule,taskScheduler&#xff09;,他还会去spark master 节点上去注册等等
* 但是呢&#xff0c;在spark中编写不同类型的spark 应用程序&#xff0c;使用的SparkContext
* 如果使用scala 使用 使用原生的SparkContext
* 如果使用java 那么就是用JavaSparkContext
* 如果使用Spark Sql 程序 那么就是 SQLContext,HiveContext
* 如果开发Spark Streaming 程序 那么就是它独有的SparkContext
* 以此类推
*/

JavaSparkContext sc&#61;new JavaSparkContext(conf);

/**
* 第三步&#xff1a; 要针对输入源&#xff08;hdfs文件,本地文件,等等&#xff09;创建一个初始的RDD
* 输入源中的数据会被打散&#xff0c;分配到RDD的每个partition 中从而形成一个初始的分布式数据集
* 本次测试 所以针对本地文件
* SparkContext 中用于根据文件 类型的输入源创建RDD 的方法&#xff0c;叫做textFile()
* 在我们这里呢 RDD 中 有元素这种概念 如果是 hdfs 或者本地文件呢 创建RDD 每一个文件就相当于文件里面的一行
*/

JavaRDD lines&#61;sc.textFile("C:\\Users\\haha174\\Desktop\\data\\world-count.txt");

// 第四步&#xff1a;对初始RDD进行transformation操作&#xff0c;也就是一些计算操作
// 通常操作会通过创建function&#xff0c;并配合RDD的map、flatMap等算子来执行
// function&#xff0c;通常&#xff0c;如果比较简单&#xff0c;则创建指定Function的匿名内部类
// 但是如果function比较复杂&#xff0c;则会单独创建一个类&#xff0c;作为实现这个function接口的类
// 先将每一行拆分成单个的单词
// FlatMapFunction&#xff0c;有两个泛型参数&#xff0c;分别代表了输入和输出类型
// 我们这里呢&#xff0c;输入肯定是String&#xff0c;因为是一行一行的文本&#xff0c;输出&#xff0c;其实也是String&#xff0c;因为是每一行的文本
// 这里先简要介绍flatMap算子的作用&#xff0c;其实就是&#xff0c;将RDD的一个元素&#xff0c;给拆分成一个或多个元素
JavaRDD<String> words &#61; lines.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split("-")) .iterator();
}
});

// 接着&#xff0c;需要将每一个单词&#xff0c;映射为(单词, 1)的这种格式
// 因为只有这样&#xff0c;后面才能根据单词作为key&#xff0c;来进行每个单词的出现次数的累加
// mapToPair&#xff0c;其实就是将每个元素&#xff0c;映射为一个(v1,v2)这样的Tuple2类型的元素
// 如果大家还记得scala里面讲的tuple&#xff0c;那么没错&#xff0c;这里的tuple2就是scala类型&#xff0c;包含了两个值
// mapToPair这个算子&#xff0c;要求的是与PairFunction配合使用&#xff0c;第一个泛型参数代表了输入类型
// 第二个和第三个泛型参数&#xff0c;代表的输出的Tuple2的第一个值和第二个值的类型
// JavaPairRDD的两个泛型参数&#xff0c;分别代表了tuple元素的第一个值和第二个值的类型
JavaPairRDD pairs &#61; words.mapToPair(new PairFunction() {private static final long serialVersionUID &#61; 1L;public Tuple2 call(String word) throws Exception {return new Tuple2(word, 1);}});
// 接着&#xff0c;需要以单词作为key&#xff0c;统计每个单词出现的次数
// 这里要使用reduceByKey这个算子&#xff0c;对每个key对应的value&#xff0c;都进行reduce操作
// 比如JavaPairRDD中有几个元素&#xff0c;分别为(hello, 1) (hello, 1) (hello, 1) (world, 1)
// reduce操作&#xff0c;相当于是把第一个值和第二个值进行计算&#xff0c;然后再将结果与第三个值进行计算
// 比如这里的hello&#xff0c;那么就相当于是&#xff0c;首先是1 &#43; 1 &#61; 2&#xff0c;然后再将2 &#43; 1 &#61; 3
// 最后返回的JavaPairRDD中的元素&#xff0c;也是tuple&#xff0c;但是第一个值就是每个key&#xff0c;第二个值就是key的value
// reduce之后的结果&#xff0c;相当于就是每个单词出现的次数
/*** public interface Function2 extends Serializable {*R call(T1 var1, T2 var2) throws Exception;*}* 第三个参数表示返回类型*/
JavaPairRDD wordCounts &#61; pairs.reduceByKey(new Function2() {private static final long serialVersionUID &#61; 1L;public Integer call(Integer v1, Integer v2) throws Exception {return v1 &#43; v2;}});
// 到这里为止&#xff0c;我们通过几个Spark算子操作&#xff0c;已经统计出了单词的次数

// 到这里为止&#xff0c;我们通过几个Spark算子操作&#xff0c;已经统计出了单词的次数// 但是&#xff0c;之前我们使用的flatMap、mapToPair、reduceByKey这种操作&#xff0c;都叫做transformation操作// 一个Spark应用中&#xff0c;光是有transformation操作&#xff0c;是不行的&#xff0c;是不会执行的&#xff0c;必须要有一种叫做action// 接着&#xff0c;最后&#xff0c;可以使用一种叫做action操作的&#xff0c;比如说&#xff0c;foreach&#xff0c;来触发程序的执行wordCounts.foreach(new VoidFunction>() {private static final long serialVersionUID &#61; 1L;public void call(Tuple2 wordCount) throws Exception {System.out.println(wordCount._1 &#43; " appeared " &#43; wordCount._2 &#43; " times.");}});sc.close();
}


推荐阅读
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Apache Hadoop HDFS QJournalProtocol 中 getJournalCTime 方法的应用与代码实例分析 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
  • 本文介绍了如何使用Java和PDFBox库根据坐标值对PDF文件进行局部切割的方法。 ... [详细]
  • Spark中使用map或flatMap将DataSet[A]转换为DataSet[B]时Schema变为Binary的问题及解决方案
    本文探讨了在使用Spark的map或flatMap算子将一个数据集转换为另一个数据集时,遇到的Schema变为Binary的问题,并提供了详细的解决方案。 ... [详细]
  • 为了在Hadoop 2.7.2中实现对Snappy压缩和解压功能的原生支持,本文详细介绍了如何重新编译Hadoop源代码,并优化其Native编译过程。通过这一优化,可以显著提升数据处理的效率和性能。此外,还探讨了编译过程中可能遇到的问题及其解决方案,为用户提供了一套完整的操作指南。 ... [详细]
  • 在本文中,我们将探讨如何在Docker环境中高效地管理和利用数据库。首先,需要安装Docker Desktop以确保本地环境准备就绪。接下来,可以从Docker Hub中选择合适的数据库镜像,并通过简单的命令将其拉取到本地。此外,我们还将介绍如何配置和优化这些数据库容器,以实现最佳性能和安全性。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • Zookeeper作为Apache Hadoop生态系统中的一个重要组件,主要致力于解决分布式应用中的常见数据管理难题。它提供了统一的命名服务、状态同步服务以及集群管理功能,有效提升了分布式系统的可靠性和可维护性。此外,Zookeeper还支持配置管理和临时节点管理,进一步增强了其在复杂分布式环境中的应用价值。 ... [详细]
author-avatar
尔的依恋
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有