热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Actor并发编程模型实现多文件的单词统计(完整源码)

给定几个文本文件(文本文件都是以空格分隔的),使用Actor并发编程来统计单词的数量思路实现思路MainActor获取要进行单词统计的

给定几个文本文件(文本文件都是以空格分隔的),使用Actor并发编程来统计单词的数量

思路

在这里插入图片描述

实现思路

  1. MainActor获取要进行单词统计的文件
  2. 根据文件数量创建对应的WordCountActor
  3. 将文件名封装为消息发送给WordCountActor
  4. WordCountActor接收消息,并统计单个文件的单词计数
  5. 将单词计数结果发送给MainActor
  6. MainActor等待所有的WordCountActor都已经成功返回消息,然后进行结果合并

步骤1 | 获取文件列表

实现思路

在main方法中读取指定目录(${project_root_dir}/data/)下的所有文件,并打印所有的文件名


实现步骤

  1. 创建用于测试的数据文件
  2. 加载工程根目录,获取到所有文件
  3. 将每一个文件名,添加目录路径
  4. 打印所有文件名

步骤2 | 创建WordCountActor

实现思路

根据文件数量创建WordCountActor,为了方便后续发送消息给Actor,将每个Actor与文件名关联在一起


实现步骤

  1. 创建WordCountActor
  2. 将文件列表转换为WordCountActor
  3. 为了后续方便发送消息给Actor,将Actor列表和文件列表拉链到一起
  4. 打印测试

步骤3 | 启动Actor/发送/接收任务消息

实现思路

启动所有WordCountActor,并发送单词统计任务消息给每个WordCountActor

[!NOTE]

此处应发送异步有返回消息



实现步骤

  1. 创建一个WordCountTask样例类消息,封装要进行单词计数的文件名
  2. 启动所有WordCountTask,并发送异步有返回消息
  3. 获取到所有的WordCount中获取到的消息(封装到一个Future列表中)
  4. 在WordCountActor中接收并打印消息

步骤4 | 消息统计文件单词计数

实现思路

读取文件文本,并统计出来单词的数量。例如:

(hadoop, 3), (spark, 1)...


实现步骤

  1. 读取文件内容,并转换为列表
  2. 按照空格切割文本,并转换为一个一个的单词
  3. 为了方便进行计数,将单词转换为元组
  4. 按照单词进行分组,然后再进行聚合统计
  5. 打印聚合统计结果

步骤5 | 封装单词计数结果回复给MainActor

实现思路

  • 将单词计数的结果封装为一个样例类消息,并发送给MainActor
  • MainActor等待所有WordCount均已返回后获取到每个WordCountActor单词计算后的结果


实现步骤

  1. 定义一个样例类封装单词计数结果
  2. 将单词计数结果发送给MainActor
  3. MainActor中检测所有WordActor是否均已返回,如果均已返回,则获取并转换结果
  4. 打印结果

步骤6 | 结果合并

实现思路

对接收到的所有单词计数进行合并。因为该部分已经在WordCountActor已经编写过,所以抽取这部分一样的代码到一个工具类中,再调用合并得到最终结果

实现步骤

  1. 创建一个用于单词合并的工具类
  2. 抽取重复代码为一个方法
  3. 在MainActor调用该合并方法,计算得到最终结果,并打印

程序

在这里插入图片描述

1.txt

hadoop sqoop hadoop
hadoop hadoop flume
hadoop hadoop hadoop
spark

2.txt

flink hadoop hive
hadoop sqoop hadoop
hadoop hadoop hadoop
spark

MainActor.scala

package com.xuimport java.io.Fileimport scala.actors.Futureobject MainActor {def main(args: Array[String]): Unit = {// 1. 加载指定目录的文件列表// 1.1. 加载指定目录的数据文件val DIR_PATH = "./data/"// 获取到指定目录下的所有数据文件名val fileNameList = new File(DIR_PATH).list().toList// 1.2. 将数据文件添加上一个目录val fileDirNameList = fileNameList.map(DIR_PATH + _)// 1.3. 打印所有的文件名println(fileDirNameList)// 2. 创建Actor关联文件// 2.1. 创建Actorval wordCountActorList = fileNameList.map {fileName => new WordCountActor}// 2.2. 将Actor和文件名关联在一起val actorFileNameList: List[(WordCountActor, String)] = wordCountActorList.zip(fileDirNameList)println(actorFileNameList)// 3. 启动Actor/发送/接收消息val futureList = actorFileNameList.map {actorFileName =>val actor = actorFileName._1// 启动Actoractor.start()// 发送消息到Actor中,发送的是异步还有返回的消息val future: Future[Any] = actor !! WordCountTask(actorFileName._2)future}// 编写一个while循环来等待所有的Actor都已经返回数据while(futureList.filter(!_.isSet).size != 0) {}// 获取Future中封装的数据val wordCountResultList = futureList.map(_.apply().asInstanceOf[WordCountResult])// 获取样例类中封装的单词统计结果// List[Map(hadoop->3, spark->1), Map(hadoop->2, flink->1)]val wordCountResultMap: List[Map[String, Int]] = wordCountResultList.map(_.wordCountMap)// List[hadoop->3, spark->1, hadoop->2, flink->1]val resultList = WordCountUtil.reduce(wordCountResultMap.flatten)println(resultList)}
}

MessagePackage.scala

package com.xu/*** 单词统计任务消息* @param fileName 文件名*/
case class WordCountTask(fileName:String)/*** 封装单词统计的结果* @param wordCountMap 单词-单词在某个文件中出现的数量*/
case class WordCountResult(wordCountMap: Map[String, Int])

WordCountActor.scala

package com.xuimport scala.actors.Actor
import scala.io.Sourceclass WordCountActor extends Actor{override def act(): Unit = {loop {react{case WordCountTask(fileName) =>println("接收到任务:对" + fileName + "进行单词统计")// 1. 读取文件,转换为列表// hadoop sqoop hadoopval lineList = Source.fromFile(fileName).getLines().toList// 2. 切割字符串,转换成一个一个的单词// [hadoop, sqoop, hadoop]val wordList: List[String] = lineList.flatMap(_.split(" "))// 3. 将单词转换为一个元组// [, , ]val wordAndCountList: List[(String, Int)] = wordList.map(_ -> 1)// 4. 调用工具类方法来对单词元组列表进行分组、聚合val wordCountMap: Map[String, Int] = WordCountUtil.reduce(wordAndCountList)// 5. 打印测试println(wordCountMap)// 6. 将统计数据封装到一个样例类中,发送给MainActorsender ! WordCountResult(wordCountMap)}}}
}

WordCountUtil.scala

package com.xuobject WordCountUtil {// 单词数据合并def reduce(wordAndCountList: List[(String, Int)]) = {// 4. 分组、聚合计算// {hadoop->List(,), sqoop->List()}val groupedMap: Map[String, List[(String, Int)]] = wordAndCountList.groupBy(_._1)// 聚合计算// {hadoop->2, sqoop->1}val wordCountMap: Map[String, Int] = groupedMap.map{keyVal =>keyVal._1 -> keyVal._2.map(_._2).sum}wordCountMap}
}


推荐阅读
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 通过马老师的视频学习了Java中的容器相关内容,包括Collection、Set、List、Map及其常见实现类,并深入了解了这些容器的基本操作方法。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 美团优选推荐系统架构师 L7/L8:算法与工程深度融合 ... [详细]
  • 字节跳动深圳研发中心安全业务团队正在火热招募人才! ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 深入理解Spark框架:RDD核心概念与操作详解
    RDD是Spark框架的核心计算模型,全称为弹性分布式数据集(Resilient Distributed Dataset)。本文详细解析了RDD的基本概念、特性及其在Spark中的关键操作,包括创建、转换和行动操作等,帮助读者深入理解Spark的工作原理和优化策略。通过具体示例和代码片段,进一步阐述了如何高效利用RDD进行大数据处理。 ... [详细]
  • Storm集成Kakfa
    一、整合说明Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:StormKafkaIntegratio ... [详细]
  • 问题详情无奈于,在kafka里没有一个较好自带的webui。启动后无法观看,并且不友好。所以,需安装一个第三方的kafka管理工具为了简化开发者和服务工程师维护 ... [详细]
  • Flink1.10定义UDAGG遇到SQL
    按照以下代码测试定义的UDAGG会一直出现org.apache.flink.table.api.ValidationException:SQLvalidationfailed.nu ... [详细]
  • 转载:https:blog.csdn.nethigh2011articledetails70155431清华大学镜像网:https:mirrors.tu ... [详细]
author-avatar
看破红尘红尘看破_728
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有