热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

用于大数据的并查集(基于HBase)的java类

在做推荐系统的时候想查看原始数据集中自然存在的类别有多少种,即找到一些子集,这些子集属于原始数据集,子集之间没有任何关联,而子集内部所有数据都有直接或间接的关联。首先考虑的是由于数据规模,读入内存是不可能的,所以要借助硬盘(虽然很不情愿)

在做推荐系统的时候想查看原始数据集中自然存在的类别有多少种,即找到一些子集,这些子集属于原始数据集,子集之间没有任何关联,而子集内部所有数据都有直接或间接的关联。 首先考虑的是由于数据规模,读入内存是不可能的,所以要借助硬盘(虽然很不情愿)

在做推荐系统的时候想查看原始数据集中自然存在的类别有多少种,即找到一些子集,这些子集属于原始数据集,子集之间没有任何关联,而子集内部所有数据都有直接或间接的关联。

首先考虑的是由于数据规模,读入内存是不可能的,所以要借助硬盘(虽然很不情愿)。既然是借助硬盘,那就要文件存取。而又由于在处理过程中需要快速的查找数据是否存在于某个集合内和将数据集合关联等操作,选择使用并查集。

这样选择之后算是有一个解决方案了,但是还需要最后一个关键的部分,就是需要建立文件索引和缓存机制以便快速进行合并和查询过程。这里选择使用的工具还是最趁手的hbase,很好的解决这两个问题。

这个类主要解决的问题就是原始数据的聚类,有关联的聚在一起。核心的两个方法是:

public byte[] findSet(byte[] pos);
public void union(byte[] pos1, byte[] pos2);

其中还有一个

public byte[] findSet(byte[] pos)

是递归实现。两个方法都使用了路径压缩进行优化。union()方法的两个参数有顺序要求,其作用是后者集合连接到前者集合的根节点。

最后,计算的并行是使用MapReduce计算框架。

package recommendsystem;
?
import java.io.IOException;
import java.lang.reflect.Array;
?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
?
public class UnionFindSet {
	private Configuration _conf;
	private HBaseAdmin _hbAdmin;
	private HTable _unionTable;
?
	public static void main(String[] args) throws IOException {
		UnionFindSet ufs = new UnionFindSet("test");
		ufs.union(Bytes.toBytes("7"), Bytes.toBytes("8"));
		ufs.union(Bytes.toBytes("5"), Bytes.toBytes("9"));
		ufs.union(Bytes.toBytes("3"), Bytes.toBytes("7"));
		ufs.union(Bytes.toBytes("4"), Bytes.toBytes("6"));
		ufs.union(Bytes.toBytes("1"), Bytes.toBytes("7"));
		for (int i = 1; i <10; i++) {
			System.out.println(Bytes.toString(ufs.findSet(Bytes.toBytes(String
					.valueOf(i)))));
		}
?
	}
?
	public UnionFindSet(String tableName) throws IOException {
		_cOnf= HBaseConfiguration.create();
		init(tableName);
	}
?
	public UnionFindSet(Configuration conf, String tableName)
			throws IOException {
		// _cOnf=new Configuration(conf);
		_cOnf= conf;
		init(tableName);
	}
?
	public UnionFindSet(HTable htable) {
		_uniOnTable= htable;
	}
?
	public void clear() throws IOException {
		_hbAdmin.close();
		_unionTable.close();
	}
?
	public int printSets() throws IOException {
		Scan scan = new Scan();
		scan.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		scan.setCaching(1000);
		ResultScanner rs = _unionTable.getScanner(scan);
		int count = 0;
		for (Result r : rs) {
			if (arrayCompare(r.getRow(), r.value()) == 0) {
				System.out.println(Bytes.toString(r.getRow()));
				count++;
			}
		}
		return count;
	}
?
	public ResultScanner getSets() throws IOException {
		Scan scan = new Scan();
		scan.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		scan.setCaching(1000);
		return _unionTable.getScanner(scan);
	}
?
	public byte[] RecursionFindSet(byte[] pos) throws IOException {
		byte[] tmpRes, tmpPre = pos;
		Result r = queryUnionHBase(pos);
		if (r.isEmpty()) {
			insertUnionHBase(pos, pos);
			return pos;
		}
		tmpRes = r.value();
		if (arrayCompare(r.value(), pos) != 0) {
			tmpPre = findSet(tmpRes);
			insertUnionHBase(pos, tmpPre);
		}
		return tmpPre;
	}
?
	public byte[] findSet(byte[] pos) throws IOException {
		byte[] tmpRes = pos, tmpPre;
		Result r = queryUnionHBase(pos);
		if (r.isEmpty()) {
			insertUnionHBase(pos, pos);
			return pos;
		}
		tmpPre = r.value();
		while (arrayCompare(tmpRes, tmpPre) != 0) {
			r = queryUnionHBase(tmpPre);
			tmpRes = tmpPre;
			tmpPre = r.value();
		}
		tmpRes = pos;
		while (arrayCompare(tmpPre, tmpRes) != 0) {
			insertUnionHBase(tmpPre, tmpRes);
			r = queryUnionHBase(tmpRes);
			tmpRes = r.value();
		}
		return tmpPre;
	}
?
	public void union(byte[] pos1, byte[] pos2) throws IOException {
		byte[] t1 = findSet(pos1);
		byte[] t2 = findSet(pos2);
		if (arrayCompare(t1, t2) == 0)
			return;
		insertUnionHBase(t2, t1);
	}
?
	private void init(String tableName) throws IOException {
		_hbAdmin = new HBaseAdmin(_conf);
		if (_hbAdmin.tableExists(tableName)) {
			_hbAdmin.disableTable(tableName);
			_hbAdmin.deleteTable(tableName);
		}
		createDB(tableName);
		_uniOnTable= new HTable(_conf, tableName);
	}
?
	private void createDB(String tableName) throws IOException {
		HTableDescriptor hd = new HTableDescriptor(tableName);
		hd.addFamily(new HColumnDescriptor(GlobalName.CLASSIFICATION_FAMILY));
		CreateDB.createTable(_hbAdmin, hd,
				CreateDB.getHexSplits("0", "110000000", 10));
	}
?
	private void insertUnionHBase(byte[] row, byte[] value) throws IOException {
		Put put = new Put(value);
		put.add(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL, row);
		_unionTable.put(put);
	}
?
	private Result queryUnionHBase(byte[] row) throws IOException {
		Get get = new Get(row);
		get.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		return _unionTable.get(get);
	}
?
	private int arrayCompare(byte[] o1, byte[] o2) {
		int len = Array.getLength(o1);
		if (len != Array.getLength(o2)) {
			return -1;
		}
		for (int i = 0; i 
    
推荐阅读
  • HBase 数据复制与灾备同步策略
    本文探讨了HBase在企业级应用中的数据复制与灾备同步解决方案,包括存量数据迁移及增量数据实时同步的方法。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 本文介绍了如何在 MapReduce 作业中使用 SequenceFileOutputFormat 生成 SequenceFile 文件,并详细解释了 SequenceFile 的结构和用途。 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 从理想主义者的内心深处萌发的技术信仰,推动了云原生技术在全球范围内的快速发展。本文将带你深入了解阿里巴巴在开源领域的贡献与成就。 ... [详细]
  • 精选10款Python框架助力并行与分布式机器学习
    随着神经网络模型的不断深化和复杂化,训练这些模型变得愈发具有挑战性,不仅需要处理大量的权重,还必须克服内存限制等问题。本文将介绍10款优秀的Python框架,帮助开发者高效地实现分布式和并行化的深度学习模型训练。 ... [详细]
  • 本文介绍如何通过整合SparkSQL与Hive来构建高效的用户画像环境,提高数据处理速度和查询效率。 ... [详细]
  • Hadoop Datanode DataXceiver 错误处理问题
    Ambari 每分钟会向 Datanode 发送一次“ping”请求以确保其正常运行。然而,Datanode 在处理空内容时没有相应的逻辑,导致出现错误。 ... [详细]
  • 本文介绍了Hadoop的核心组件,包括高可靠性和高吞吐量的分布式文件系统HDFS、分布式的离线并行计算框架MapReduce、作业调度与集群资源管理框架YARN以及支持其他模块的工具模块Common。 ... [详细]
  • 大数据领域的职业路径与角色解析
    本文将深入探讨大数据领域的各种职业和工作角色,帮助读者全面了解大数据行业的需求、市场趋势,以及从入门到高级专业人士的职业发展路径。文章还将详细介绍不同公司对大数据人才的需求,并解析各岗位的具体职责、所需技能和经验。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
author-avatar
平凡小柏
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有