热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

java使用hadoop实现关联商品统计

本篇文章java使用hadoop实现关联商品统计,可以实现商品的关联统计,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。

最近几天一直在看Hadoop相关的书籍,目前稍微有点感觉,自己就仿照着WordCount程序自己编写了一个统计关联商品。

需求描述:

根据超市的销售清单,计算商品之间的关联程度(即统计同时买A商品和B商品的次数)。

数据格式:

超市销售清单简化为如下格式:一行表示一个清单,每个商品采用 "," 分割,如下图所示:

需求分析:

采用hadoop中的mapreduce对该需求进行计算。

map函数主要拆分出关联的商品,输出结果为 key为商品A,value为商品B,对于第一条三条结果拆分结果如下图所示:

这里为了统计出和A、B两件商品想关联的商品,所以商品A、B之间的关系输出两条结果即 A-B、B-A。

reduce函数分别对和商品A相关的商品进行分组统计,即分别求value中的各个商品出现的次数,输出结果为key为商品A|商品B,value为该组合出现的次数。针对上面提到的5条记录,对map输出中key值为R的做下分析:

通过map函数的处理,得到如下图所示的记录:

reduce中对map输出的value值进行分组计数,得到的结果如下图所示

将商品A B作为key,组合个数作为value输出,输出结果如下图所示:

对于需求的实现过程的分析到目前就结束了,下面就看下具体的代码实现

代码实现:

关于代码就不做详细的介绍,具体参照代码之中的注释吧。

package com; 
 
import java.io.IOException; 
import java.util.HashMap; 
import java.util.Map.Entry; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
 
public class Test extends Configured implements Tool{ 
 
  /** 
   * map类,实现数据的预处理 
   * 输出结果key为商品A value为关联商品B 
   * @author lulei 
   */ 
  public static class MapT extends Mapper { 
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 
      String line = value.toString(); 
      if (!(line == null || "".equals(line))) { 
        //分割商品 
        String []vs = line.split(","); 
        //两两组合,构成一条记录 
        for (int i = 0; i <(vs.length - 1); i++) { 
          if ("".equals(vs[i])) {//排除空记录 
            continue; 
          } 
          for (int j = i+1; j  { 
    private int count; 
     
    /** 
     * 初始化 
     */ 
    public void setup(Context context) { 
      //从参数中获取最小记录个数 
      String countStr = context.getConfiguration().get("count"); 
      try { 
        this.count = Integer.parseInt(countStr); 
      } catch (Exception e) { 
        this.count = 0; 
      } 
    } 
    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException{ 
      String keyStr = key.toString(); 
      HashMap hashMap = new HashMap(); 
      //利用hash统计B商品的次数 
      for (Text value : values) { 
        String valueStr = value.toString(); 
        if (hashMap.containsKey(valueStr)) { 
          hashMap.put(valueStr, hashMap.get(valueStr) + 1); 
        } else { 
          hashMap.put(valueStr, 1); 
        } 
      } 
      //将结果输出 
      for (Entry entry : hashMap.entrySet()) { 
        if (entry.getValue() >= this.count) {//只输出次数不小于最小值的 
          context.write(new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue())); 
        } 
      } 
    } 
  } 
   
  @Override 
  public int run(String[] arg0) throws Exception { 
    // TODO Auto-generated method stub 
    Configuration cOnf= getConf(); 
    conf.set("count", arg0[2]); 
     
    Job job = new Job(conf); 
    job.setJobName("jobtest"); 
     
    job.setOutputFormatClass(TextOutputFormat.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
     
    job.setMapperClass(MapT.class); 
    job.setReducerClass(ReduceT.class); 
     
    FileInputFormat.addInputPath(job, new Path(arg0[0])); 
    FileOutputFormat.setOutputPath(job, new Path(arg0[1])); 
     
    job.waitForCompletion(true); 
     
    return job.isSuccessful() &#63; 0 : 1; 
     
  } 
   
  /** 
   * @param args 
   */ 
  public static void main(String[] args) { 
    // TODO Auto-generated method stub 
    if (args.length != 3) { 
      System.exit(-1); 
    } 
    try { 
      int res = ToolRunner.run(new Configuration(), new Test(), args); 
      System.exit(res); 
    } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
    } 
  } 
 
} 

上传运行:

将程序打包成jar文件,上传到机群之中。将测试数据也上传到HDFS分布式文件系统中。

命令运行截图如下图所示:

运行结束后查看相应的HDFS文件系统,如下图所示:

到此一个完整的mapreduce程序就完成了,关于hadoop的学习,自己还将继续~感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


推荐阅读
  • 深入解析Hadoop的核心组件与工作原理
    本文详细介绍了Hadoop的三大核心组件:分布式文件系统HDFS、资源管理器YARN和分布式计算框架MapReduce。通过分析这些组件的工作机制,帮助读者更好地理解Hadoop的架构及其在大数据处理中的应用。 ... [详细]
  • Hadoop入门与核心组件详解
    本文详细介绍了Hadoop的基础知识及其核心组件,包括HDFS、MapReduce和YARN。通过本文,读者可以全面了解Hadoop的生态系统及应用场景。 ... [详细]
  • HBase运维工具全解析
    本文深入探讨了HBase常用的运维工具,详细介绍了每种工具的功能、使用场景及操作示例。对于HBase的开发人员和运维工程师来说,这些工具是日常管理和故障排查的重要手段。 ... [详细]
  • Hadoop发行版本选择指南:技术解析与应用实践
    本文详细介绍了Hadoop的不同发行版本及其特点,帮助读者根据实际需求选择最合适的Hadoop版本。内容涵盖Apache Hadoop、Cloudera CDH等主流版本的特性及应用场景。 ... [详细]
  • 深入解析BookKeeper的设计与应用场景
    本文介绍了由Yahoo在2009年开发并于2011年开源的BookKeeper技术。BookKeeper是一种高效且可靠的日志流存储解决方案,广泛应用于需要高性能和强数据持久性的场景。 ... [详细]
  • MapReduce原理是怎么剖析的
    这期内容当中小编将会给大家带来有关MapReduce原理是怎么剖析的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1 ... [详细]
  • 深入解析Spark核心架构与部署策略
    本文详细探讨了Spark的核心架构,包括其运行机制、任务调度和内存管理等方面,以及四种主要的部署模式:Standalone、Apache Mesos、Hadoop YARN和Kubernetes。通过本文,读者可以深入了解Spark的工作原理及其在不同环境下的部署方式。 ... [详细]
  • 深入浅出:Hadoop架构详解
    Hadoop作为大数据处理的核心技术,包含了一系列组件如HDFS(分布式文件系统)、YARN(资源管理框架)和MapReduce(并行计算模型)。本文将通过实例解析Hadoop的工作原理及其优势。 ... [详细]
  • 初探Hadoop:第一章概览
    本文深入探讨了《Hadoop》第一章的内容,重点介绍了Hadoop的基本概念及其如何解决大数据处理中的关键挑战。 ... [详细]
  • Hadoop MapReduce 实战案例:手机流量使用统计分析
    本文通过一个具体的Hadoop MapReduce案例,详细介绍了如何利用MapReduce框架来统计和分析手机用户的流量使用情况,包括上行和下行流量的计算以及总流量的汇总。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 构建Filebeat-Kafka-Logstash-ElasticSearch-Kibana日志收集体系
    本文介绍了如何使用Filebeat、Kafka、Logstash、ElasticSearch和Kibana构建一个高效、可扩展的日志收集与分析系统。各组件分别承担不同的职责,确保日志数据能够被有效收集、处理、存储及可视化。 ... [详细]
  • 本文介绍了Hive作为基于Hadoop的数据仓库工具的核心概念,包括其基本功能、使用理由、特点以及与Hadoop的关系。同时,文章还探讨了Hive相较于传统关系型数据库的不同之处,并展望了Hive的发展前景。 ... [详细]
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
  • HBase 数据复制与灾备同步策略
    本文探讨了HBase在企业级应用中的数据复制与灾备同步解决方案,包括存量数据迁移及增量数据实时同步的方法。 ... [详细]
author-avatar
手机用户2602897795
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有