热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

用于大数据的并查集(基于HBase)的java类

在做推荐系统的时候想查看原始数据集中自然存在的类别有多少种,即找到一些子集,这些子集属于原始数据集,子集之间没有任何关联,而子集内部所有数据都有直接或间接的关联。首先考虑的是由于数据规模,读入内存是不可能的,所以要借助硬盘(虽然很不情愿)

在做推荐系统的时候想查看原始数据集中自然存在的类别有多少种,即找到一些子集,这些子集属于原始数据集,子集之间没有任何关联,而子集内部所有数据都有直接或间接的关联。 首先考虑的是由于数据规模,读入内存是不可能的,所以要借助硬盘(虽然很不情愿)

在做推荐系统的时候想查看原始数据集中自然存在的类别有多少种,即找到一些子集,这些子集属于原始数据集,子集之间没有任何关联,而子集内部所有数据都有直接或间接的关联。

首先考虑的是由于数据规模,读入内存是不可能的,所以要借助硬盘(虽然很不情愿)。既然是借助硬盘,那就要文件存取。而又由于在处理过程中需要快速的查找数据是否存在于某个集合内和将数据集合关联等操作,选择使用并查集。

这样选择之后算是有一个解决方案了,但是还需要最后一个关键的部分,就是需要建立文件索引和缓存机制以便快速进行合并和查询过程。这里选择使用的工具还是最趁手的hbase,很好的解决这两个问题。

这个类主要解决的问题就是原始数据的聚类,有关联的聚在一起。核心的两个方法是:

public byte[] findSet(byte[] pos);
public void union(byte[] pos1, byte[] pos2);

其中还有一个

public byte[] findSet(byte[] pos)

是递归实现。两个方法都使用了路径压缩进行优化。union()方法的两个参数有顺序要求,其作用是后者集合连接到前者集合的根节点。

最后,计算的并行是使用MapReduce计算框架。

package recommendsystem;
?
import java.io.IOException;
import java.lang.reflect.Array;
?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
?
public class UnionFindSet {
	private Configuration _conf;
	private HBaseAdmin _hbAdmin;
	private HTable _unionTable;
?
	public static void main(String[] args) throws IOException {
		UnionFindSet ufs = new UnionFindSet("test");
		ufs.union(Bytes.toBytes("7"), Bytes.toBytes("8"));
		ufs.union(Bytes.toBytes("5"), Bytes.toBytes("9"));
		ufs.union(Bytes.toBytes("3"), Bytes.toBytes("7"));
		ufs.union(Bytes.toBytes("4"), Bytes.toBytes("6"));
		ufs.union(Bytes.toBytes("1"), Bytes.toBytes("7"));
		for (int i = 1; i <10; i++) {
			System.out.println(Bytes.toString(ufs.findSet(Bytes.toBytes(String
					.valueOf(i)))));
		}
?
	}
?
	public UnionFindSet(String tableName) throws IOException {
		_cOnf= HBaseConfiguration.create();
		init(tableName);
	}
?
	public UnionFindSet(Configuration conf, String tableName)
			throws IOException {
		// _cOnf=new Configuration(conf);
		_cOnf= conf;
		init(tableName);
	}
?
	public UnionFindSet(HTable htable) {
		_uniOnTable= htable;
	}
?
	public void clear() throws IOException {
		_hbAdmin.close();
		_unionTable.close();
	}
?
	public int printSets() throws IOException {
		Scan scan = new Scan();
		scan.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		scan.setCaching(1000);
		ResultScanner rs = _unionTable.getScanner(scan);
		int count = 0;
		for (Result r : rs) {
			if (arrayCompare(r.getRow(), r.value()) == 0) {
				System.out.println(Bytes.toString(r.getRow()));
				count++;
			}
		}
		return count;
	}
?
	public ResultScanner getSets() throws IOException {
		Scan scan = new Scan();
		scan.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		scan.setCaching(1000);
		return _unionTable.getScanner(scan);
	}
?
	public byte[] RecursionFindSet(byte[] pos) throws IOException {
		byte[] tmpRes, tmpPre = pos;
		Result r = queryUnionHBase(pos);
		if (r.isEmpty()) {
			insertUnionHBase(pos, pos);
			return pos;
		}
		tmpRes = r.value();
		if (arrayCompare(r.value(), pos) != 0) {
			tmpPre = findSet(tmpRes);
			insertUnionHBase(pos, tmpPre);
		}
		return tmpPre;
	}
?
	public byte[] findSet(byte[] pos) throws IOException {
		byte[] tmpRes = pos, tmpPre;
		Result r = queryUnionHBase(pos);
		if (r.isEmpty()) {
			insertUnionHBase(pos, pos);
			return pos;
		}
		tmpPre = r.value();
		while (arrayCompare(tmpRes, tmpPre) != 0) {
			r = queryUnionHBase(tmpPre);
			tmpRes = tmpPre;
			tmpPre = r.value();
		}
		tmpRes = pos;
		while (arrayCompare(tmpPre, tmpRes) != 0) {
			insertUnionHBase(tmpPre, tmpRes);
			r = queryUnionHBase(tmpRes);
			tmpRes = r.value();
		}
		return tmpPre;
	}
?
	public void union(byte[] pos1, byte[] pos2) throws IOException {
		byte[] t1 = findSet(pos1);
		byte[] t2 = findSet(pos2);
		if (arrayCompare(t1, t2) == 0)
			return;
		insertUnionHBase(t2, t1);
	}
?
	private void init(String tableName) throws IOException {
		_hbAdmin = new HBaseAdmin(_conf);
		if (_hbAdmin.tableExists(tableName)) {
			_hbAdmin.disableTable(tableName);
			_hbAdmin.deleteTable(tableName);
		}
		createDB(tableName);
		_uniOnTable= new HTable(_conf, tableName);
	}
?
	private void createDB(String tableName) throws IOException {
		HTableDescriptor hd = new HTableDescriptor(tableName);
		hd.addFamily(new HColumnDescriptor(GlobalName.CLASSIFICATION_FAMILY));
		CreateDB.createTable(_hbAdmin, hd,
				CreateDB.getHexSplits("0", "110000000", 10));
	}
?
	private void insertUnionHBase(byte[] row, byte[] value) throws IOException {
		Put put = new Put(value);
		put.add(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL, row);
		_unionTable.put(put);
	}
?
	private Result queryUnionHBase(byte[] row) throws IOException {
		Get get = new Get(row);
		get.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		return _unionTable.get(get);
	}
?
	private int arrayCompare(byte[] o1, byte[] o2) {
		int len = Array.getLength(o1);
		if (len != Array.getLength(o2)) {
			return -1;
		}
		for (int i = 0; i 
    
推荐阅读
  • HBase运维工具全解析
    本文深入探讨了HBase常用的运维工具,详细介绍了每种工具的功能、使用场景及操作示例。对于HBase的开发人员和运维工程师来说,这些工具是日常管理和故障排查的重要手段。 ... [详细]
  • Hadoop发行版本选择指南:技术解析与应用实践
    本文详细介绍了Hadoop的不同发行版本及其特点,帮助读者根据实际需求选择最合适的Hadoop版本。内容涵盖Apache Hadoop、Cloudera CDH等主流版本的特性及应用场景。 ... [详细]
  • 深入解析Hadoop的核心组件与工作原理
    本文详细介绍了Hadoop的三大核心组件:分布式文件系统HDFS、资源管理器YARN和分布式计算框架MapReduce。通过分析这些组件的工作机制,帮助读者更好地理解Hadoop的架构及其在大数据处理中的应用。 ... [详细]
  • 深入解析Spark核心架构与部署策略
    本文详细探讨了Spark的核心架构,包括其运行机制、任务调度和内存管理等方面,以及四种主要的部署模式:Standalone、Apache Mesos、Hadoop YARN和Kubernetes。通过本文,读者可以深入了解Spark的工作原理及其在不同环境下的部署方式。 ... [详细]
  • Hadoop MapReduce 实战案例:手机流量使用统计分析
    本文通过一个具体的Hadoop MapReduce案例,详细介绍了如何利用MapReduce框架来统计和分析手机用户的流量使用情况,包括上行和下行流量的计算以及总流量的汇总。 ... [详细]
  • HBase 数据复制与灾备同步策略
    本文探讨了HBase在企业级应用中的数据复制与灾备同步解决方案,包括存量数据迁移及增量数据实时同步的方法。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 本文详细分析了Hive在启动过程中遇到的权限拒绝错误,并提供了多种解决方案,包括调整文件权限、用户组设置以及环境变量配置等。 ... [详细]
  • 本文详细探讨了 org.apache.hadoop.ha.HAServiceTarget 类中的 checkFencingConfigured 方法,包括其功能、应用场景及代码示例。通过实际代码片段,帮助开发者更好地理解和使用该方法。 ... [详细]
  • 简化报表生成:EasyReport工具的全面解析
    本文详细介绍了EasyReport,一个易于使用的开源Web报表工具。该工具支持Hadoop、HBase及多种关系型数据库,能够将SQL查询结果转换为HTML表格,并提供Excel导出、图表显示和表头冻结等功能。 ... [详细]
  • 本文探讨了如何在Hive(基于Hadoop)环境中编写类似SQL的语句,以去除字段中的空格。特别是在处理邮政编码等数据时,去除特定位置的空格是常见的需求。 ... [详细]
  • 本文详细介绍如何使用 Apache Spark 执行基本任务,包括启动 Spark Shell、运行示例程序以及编写简单的 WordCount 程序。同时提供了参数配置的注意事项和优化建议。 ... [详细]
  • 本文探讨了Hive作业中Map任务数量的确定方式,主要涉及HiveInputFormat和CombineHiveInputFormat两种InputFormat的分片计算逻辑。通过调整相关参数,可以有效控制Map任务的数量,进而优化Hive作业的性能。 ... [详细]
  • 深入解析BookKeeper的设计与应用场景
    本文介绍了由Yahoo在2009年开发并于2011年开源的BookKeeper技术。BookKeeper是一种高效且可靠的日志流存储解决方案,广泛应用于需要高性能和强数据持久性的场景。 ... [详细]
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
author-avatar
平凡小柏
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有