热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

HadoopPigLoadfunc

hadooppig设计的还是很不错的,可以写UDF每一个统计基本上都是要对原始日志进行切分,把想要的一些字段EXTRACT提取出来日志有着基本的模式mac:50:A4:C8:D7:10:7D|build:5141bc99|network:mobile|version:2.4.1|id:taobao22935952431|基本

hadoop pig 设计的还是很不错的,可以写 UDF 每一个统计基本上都是要对原始日志进行切分,把想要的一些字段 EXTRACT 提取出来 日志有着基本的模式 "mac:50:A4:C8:D7:10:7D"|"build:5141bc99"|"network:mobile"|"version:2.4.1"|"id:taobao22935952431"| 基本

hadoop pig 设计的还是很不错的,可以写 UDF

每一个统计基本上都是要对原始日志进行切分,把想要的一些字段 EXTRACT 提取出来

日志有着基本的模式

"mac:50:A4:C8:D7:10:7D"|"build:5141bc99"|"network:mobile"|"version:2.4.1"|"id:taobao22935952431"|

基本上是 key, value对,自定义一个 load function ,指定 key,就可以获取 对应的value,在 pig 中可以使用

REGISTER /jar/kload.jar;
AA = LOAD '/log/load.log' USING kload.KoudaiLoader('mac,build') AS (mac,build);
DUMP AA;

输出结果

(50:A4:C8:D7:10:7D,5141bc99)

koudaiLoader是自己实现的一个 Load function,输出为要获取的key,输出为key所对应的 value

    package kload;
    import java.io.IOException;
    import java.util.*;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.pig.*;
    import org.apache.pig.backend.executionengine.ExecException;
    import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.*;
    import org.apache.pig.data.*;
    public class KoudaiLoader  extends LoadFunc{
            protected RecordReader recordReader = null;
            private String fieldDel = "";
            private String[] reqFildList;
            private ArrayList mProtoTuple = null;
            private TupleFactory mTupleFactory = TupleFactory.getInstance();
            private static final int BUFFER_SIZE = 1024;
            public KoudaiLoader() {
            }
            public KoudaiLoader(String delimiter) {
                    this();
                    if(delimiter == null || delimiter.length() == 0){
                            throw new RuntimeException("empty delimiter");
                    }
                    this.reqFildList=delimiter.split(",");
            }
            @Override
            public Tuple getNext() throws IOException {
                    try {
                            Map tmpMap = new HashMap();
                            List lst = new ArrayList();
                            boolean flag = recordReader.nextKeyValue();
                            int i = 0;
                            if (!flag) {
                                    return null;
                            }
                            Text value = (Text) recordReader.getCurrentValue();
                            tmpMap = this.sourceToMap(value.toString());
                            if( tmpMap == null || tmpMap.size() == 0 ){
                                return null;
                            }
                            for (String s :this.reqFildList){
                                String item = tmpMap.get(s); 
                                if(item == null || item.length() == 0){
                                        item = "";
                                }
                                    lst.add(i++, item);
                            }
                            return TupleFactory.getInstance().newTuple(lst);
                    } catch (InterruptedException e) {
                            throw new ExecException("Read data error", PigException.REMOTE_ENVIRONMENT, e);
                    }
            }
            public Map sourceToMap(String pline){
                    String line = pline;
                    int strLen = 0;
                    String[] strArr;
                    String[] strSubArr;
                    Map mapLog = new HashMap();
                    if(pline == null || pline.length() <= 2){
                        return mapLog;
                    }
                    line = line.trim();
                    strLen = line.length();
                    line = line.substring(1,strLen -1);
                    strArr = line.split("\"\\|\"");
                    if(strArr.length == 0){
                        return mapLog;
                    }
                    for(String s:strArr){
                            if(s != null && s.length() != 0){
                                    strSubArr = s.split(":",2); 
                                    if(strSubArr.length == 2){
                                            mapLog.put(strSubArr[0],strSubArr[1]);
                                    }
                            }
                    }
                    return mapLog;
            }
            @Override
            public void setLocation(String s, Job job) throws IOException {
                FileInputFormat.setInputPaths(job, s);
            }
            @Override
            public InputFormat getInputFormat() throws IOException {
                return new PigTextInputFormat();
            }
            @Override
            public void prepareToRead(RecordReader recordReader, PigSplit pigSplit) throws IOException {
                this.recordReader = recordReader;
            }
    }

编译

javac -cp /usr/local/webserver/pig/pig-0.9.2.jar:.  KoudaiLoader.java

打成jar包

jar -cf kload.jar kload

用pig在本地模式下运行

java -cp /usr/local/webserver/pig/pig-0.9.2.jar:/jar/kload.jar org.apache.pig.Main -x local kload.pig
推荐阅读
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 本文介绍了如何在 MapReduce 作业中使用 SequenceFileOutputFormat 生成 SequenceFile 文件,并详细解释了 SequenceFile 的结构和用途。 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 深入理解云计算与大数据技术
    本文详细探讨了云计算与大数据技术的关键知识点,包括大数据处理平台、社会网络大数据、城市大数据、工业大数据、教育大数据、数据开放与共享的应用,以及搜索引擎与Web挖掘、推荐技术的研究及应用。文章还涵盖了云计算的基础概念、特点和服务类型分类。 ... [详细]
  • Apache Hadoop HDFS QJournalProtocol 中 getJournalCTime 方法的应用与代码实例分析 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 在前一篇文章《Hadoop》系列之“踽踽独行”(二)中,我们详细探讨了云计算的核心概念。本章将重点转向物联网技术,全面解析其基本原理、应用场景及未来发展前景。通过深入分析物联网的架构和技术栈,我们将揭示其在智能城市、工业自动化和智能家居等领域的广泛应用潜力。此外,还将讨论物联网面临的挑战,如数据安全和隐私保护等问题,并展望其在未来技术融合中的重要角色。 ... [详细]
  • Python 数据分析领域不仅拥有高质量的开发环境,还提供了众多功能强大的第三方库。本文将介绍六个关键步骤,帮助读者掌握 Python 数据分析的核心技能,并深入探讨六款虽不广为人知但却极具潜力的数据处理库,如 Pandas 的替代品和新兴的可视化工具,助力数据科学家和分析师提升工作效率。 ... [详细]
  • 如何提升Python处理约1GB数据集时的运行效率?
    如何提升Python处理约1GB数据集时的运行效率?本文探讨了在后端开发中使用Python处理大规模数据集的优化方法。通过分析常见的性能瓶颈,介绍了多种提高数据处理速度的技术,包括使用高效的数据结构、并行计算、内存管理和代码优化策略。此外,文章还提供了在Ubuntu环境下配置和测试这些优化方案的具体步骤,适用于从事推荐系统等领域的开发者。 ... [详细]
  • hive和mysql的区别是什么[mysql教程]
    hive和mysql的区别有:1、查询语言不同,hive是hql语言,MySQL是sql语句;2、数据存储位置不同,hive把数据存储在hdfs上,MySQL把数据存储在自己的系统 ... [详细]
author-avatar
dsgfg
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有