热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mahout关联规则源码分析Part1

最近看了关联规则的相关算法,着重看了mahout的具体实现,mahout官网上面给出了好多算法,具体网址如下:https:c

最近看了关联规则的相关算法,着重看了mahout的具体实现,mahout官网上面给出了好多算法,具体网址如下:https://cwiki.apache.org/confluence/display/MAHOUT/Parallel+Frequent+Pattern+Mining 。

先说下命令行运行关联规则,关联规则的算法在mahout-core-0,7.jar包下面,命令行运行如下:

fansy@fansypc:~/hadoop-1.0.2$ bin/hadoop jar ../mahout-pure-0.7/core/target/mahout-core-0.7.jar
org.apache.mahout.fpm.pfpgrowth.FPGrowthDriver -i input/retail.dat -o date1101/fpgrowthdriver00 -s 2 -method mapreduce -regex '[\ ]'
12/11/01 16:31:39 INFO common.AbstractJob:
Command line arguments: {--encoding=[UTF-8], --endPhase=[2147483647],
--input=[input/retail.dat], --maxHeapSize=[50], --method=[mapreduce], --minSupport=[2], --numGroups=[1000],
--numTreeCacheEntries=[5], --output=[date1101/fpgrowthdriver00], --splitterPattern=[[\ ]], --startPhase=[0], --tempDir=[temp]}
最后的 -regex '[\ ]' 一定是需要的对于输入数据 retail.dat来说,因为mahout默认的item的分隔符是没有空格的;

而且这里只讨论 并行的程序,所以使用 -method mapreduce

下面分析源码:

在分析源码之前,先看一张图:


这张图很好的说明了mahout实现关联规则思想,或者说是流程;

首先,读入数据,比如上图的5个transactions(事务),接着根据一张总表(这张总表是每个item的次数从大到小的一个排列,同时这张表还去除了出现次数小于min_support的item)把这些transactions 去除一些项目并按照总表的顺序排序,得到另外的一个transaction A,接着map的输出就是根据transaction A输出规则,从出现次数最小的item开始输出直到出现次数第二大的item。

Reduce收集map输出相同的key值,把他们的value值放一个集合set 中,然后在统计这些集合中item出现的次数,如果次数大于min_confidence(本例中为3),那么就输出key和此item的规则;

命令行运行时可以看到三个MR,即可以把关联规则的算法分为三部分,但是个人觉得可以分为四个部分,其中的一部分就是总表的获得;鉴于目前本人只看了一个MR和总表的获得部分的源码,今天就只分享这两个部分;

贴代码先,基本都是源码来的,只是稍微改了下:

第一个MR的驱动程序:PFGrowth_ParallelCounting.java:

package org.fansy.date1101.pfgrowth;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.mahout.common.HadoopUtil;
public class PFGrowth_ParallelCounting {public boolean runParallelCountingJob(String input,String output) throws IOException, ClassNotFoundException, InterruptedException{Configuration conf=new Configuration();Job job = new Job(conf, "Parallel Counting Driver running over input: " + input);job.setJarByClass(PFGrowth_ParallelCounting.class);job.setMapperClass(PFGrowth_ParallelCountingM.class);job.setCombinerClass(PFGrowth_ParallelCountingR.class);job.setReducerClass(PFGrowth_ParallelCountingR.class);job.setOutputFormatClass(SequenceFileOutputFormat.class); // get rid of this line you can get the text filejob.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job,new Path( input));Path outPut=new Path(output,"parallelcounting");HadoopUtil.delete(conf, outPut);FileOutputFormat.setOutputPath(job, outPut); boolean succeeded = job.waitForCompletion(true);if (!succeeded) {throw new IllegalStateException("Job failed!");} return succeeded;}
}
第一个MR的M:PFGrowth_ParallelCountingM.java:

package org.fansy.date1101.pfgrowth;
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class PFGrowth_ParallelCountingM extends Mapper {private static final LongWritable ONE = new LongWritable(1);private Pattern splitter=Pattern.compile("[ ,\t]*[ ,|\t][ ,\t]*");@Overrideprotected void map(LongWritable offset, Text input, Context context) throws IOException,InterruptedException {String[] items = splitter.split(input.toString());for (String item : items) {if (item.trim().isEmpty()) {continue;}context.setStatus("Parallel Counting Mapper: " + item);context.write(new Text(item), ONE);}}
}
上面的代码中的间隔符号修改了源码,加上了空格;

第一个MR的R:PFGrowth_ParallelCountingR.java:

package org.fansy.date1101.pfgrowth;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class PFGrowth_ParallelCountingR extends Reducer{protected void reduce(Text key, Iterable values, Context context) throws IOException,InterruptedException {long sum = 0;for (LongWritable value : values) {context.setStatus("Parallel Counting Reducer :" + key);sum += value.get();}context.setStatus("Parallel Counting Reducer: " + key + " => " + sum);context.write(key, new LongWritable(sum));}
}
其实第一个MR还是比较好理解的,M分解每个transaction的item,然后输出,然后R针对每个item_id 把value值相加求和,这个和wordcount的例子是一样的,当然这里也可以加combine操作的。

接着是总表的获得:

PFGrowth_Driver.java ,同时这个程序也调用第一个MR,也就是说可以直接运行这个文件就可以同时运行第一个MR和获得总表了。

package org.fansy.date1101.pfgrowth;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.Parameters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
import com.google.common.collect.Lists;
class MyComparator implements Comparator>{@Overridepublic int compare(Pair o1, Pair o2) {int ret = o2.getSecond().compareTo(o1.getSecond());if (ret != 0) {return ret;}return o1.getFirst().compareTo(o2.getFirst());}
}
public class PFGrowth_Driver {public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException{if(args.length!=3){System.out.println("wrong input args");System.out.println("usage: ");System.exit(-1);}// set parametersParameters params=new Parameters();params.set("INPUT", args[0]);params.set("OUTPUT", args[1]);params.set("MIN_SUPPORT", args[2]);// get parametersString input=params.get("INPUT");String output=params.get("OUTPUT");// run the first jobPFGrowth_ParallelCounting ppc=new PFGrowth_ParallelCounting();ppc.runParallelCountingJob(input, output); // read input and set the fListList> fList = readFList(params);Configuration conf=new Configuration();saveFList(fList, params, conf); } /*** Serializes the fList and returns the string representation of the List* * @return Serialized String representation of List*/public static void saveFList(Iterable> flist, Parameters params, Configuration conf)throws IOException {Path flistPath = new Path(params.get("OUTPUT"), "fList");FileSystem fs = FileSystem.get(flistPath.toUri(), conf);flistPath = fs.makeQualified(flistPath);HadoopUtil.delete(conf, flistPath);SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath, Text.class, LongWritable.class);try {for (Pair pair : flist) {writer.append(new Text(pair.getFirst()), new LongWritable(pair.getSecond()));}} finally {writer.close();}DistributedCache.addCacheFile(flistPath.toUri(), conf);}public static List> readFList(Parameters params) {int minSupport = Integer.valueOf(params.get("MIN_SUPPORT"));Configuration conf = new Configuration(); Path parallelCountingPath = new Path(params.get("OUTPUT"),"parallelcounting");// add MyComparatorPriorityQueue> queue = new PriorityQueue>(11,new MyComparator());// sort according to the occur times from large to small for (Pair record: new SequenceFileDirIterable(new Path(parallelCountingPath, "part-*"),PathType.GLOB, null, null, true, conf)) {long value = record.getSecond().get();if (value >= minSupport) { // get rid of the item which is below the minimum supportqueue.add(new Pair(record.getFirst().toString(), value));}}List> fList = Lists.newArrayList();while (!queue.isEmpty()) {fList.add(queue.poll());}return fList;}
}
第一个MR运行完毕后,调用readFList()函数,把第一个MR的输出按照item出现的次数从大到小放入一个列表List中,然后调用saveFList()函数把上面求得的List存入HDFS文件中,不过存入的格式是被序列话的,可以另外编写函数查看文件是否和自己的假设相同;

FList 文件反序列化如下:





分享,快乐,成长







推荐阅读
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • PDO MySQL
    PDOMySQL如果文章有成千上万篇,该怎样保存?数据保存有多种方式,比如单机文件、单机数据库(SQLite)、网络数据库(MySQL、MariaDB)等等。根据项目来选择,做We ... [详细]
  • Question该提问来源于开源项目:react-native-device-info/react-native-device-info ... [详细]
  • 基于词向量计算文本相似度1.测试数据:链接:https:pan.baidu.coms1fXJjcujAmAwTfsuTg2CbWA提取码:f4vx2.实验代码:imp ... [详细]
  • 本文介绍了如何找到并终止在8080端口上运行的进程的方法,通过使用终端命令lsof -i :8080可以获取在该端口上运行的所有进程的输出,并使用kill命令终止指定进程的运行。 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • imx6ull开发板驱动MT7601U无线网卡的方法和步骤详解
    本文详细介绍了在imx6ull开发板上驱动MT7601U无线网卡的方法和步骤。首先介绍了开发环境和硬件平台,然后说明了MT7601U驱动已经集成在linux内核的linux-4.x.x/drivers/net/wireless/mediatek/mt7601u文件中。接着介绍了移植mt7601u驱动的过程,包括编译内核和配置设备驱动。最后,列举了关键词和相关信息供读者参考。 ... [详细]
  • 解决.net项目中未注册“microsoft.ACE.oledb.12.0”提供程序的方法
    在开发.net项目中,通过microsoft.ACE.oledb读取excel文件信息时,报错“未在本地计算机上注册“microsoft.ACE.oledb.12.0”提供程序”。本文提供了解决这个问题的方法,包括错误描述和代码示例。通过注册提供程序和修改连接字符串,可以成功读取excel文件信息。 ... [详细]
  • 本文介绍了一个编程问题,要求求解一个给定n阶方阵的鞍点个数。通过输入格式的描述,可以了解到输入的是一个n阶方阵,每个元素都是整数。通过输出格式的描述,可以了解到输出的是鞍点的个数。通过题目集全集传送门,可以了解到提供了两个函数is_line_max和is_rank_min,用于判断一个元素是否为鞍点。本文还提供了三个样例,分别展示了不同情况下的输入和输出。 ... [详细]
  • Python的参数解析argparse模块的学习
    本文介绍了Python中参数解析的重要模块argparse的学习内容。包括位置参数和可选参数的定义和使用方式,以及add_argument()函数的详细参数关键字解释。同时还介绍了命令行参数的操作和可接受数量的设置,其中包括整数类型的参数。通过学习本文内容,可以更好地理解和使用argparse模块进行参数解析。 ... [详细]
  • 【Python 爬虫】破解按照顺序点击验证码(非自动化浏览器)
    #请求到验证码base64编码json_img_datajson_raw.get(Vimage)#获取到验证码编码 #保存验证码图片到本地defbase64_to_img(bstr ... [详细]
  • 动态多点××× 单云双HUB
    动态多点是一个高扩展的IPSEC解决方案传统的ipsecS2S有如下劣势1.中心站点配置量大,无论是采用经典ipsec***还是采用greoveripsec多一个分支 ... [详细]
  • 做实验需要重命名数据集的名字,有几个容易踩坑的地方和小技巧,总结一下importospathfilelistos.listdir(path)#文件夹路 ... [详细]
author-avatar
fffas2010_734_196
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有