热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

玩转大数据系列之ApachePig如何与ApacheSolr集成(二)

散仙,在上篇文章中介绍了,如何使用ApachePig与Lucene集成,还不知道的道友们,可以先看下上篇,熟悉下具体的流程。?在与Lucene集成过程中,我们发现最终还要把生成的L

散仙,在上篇文章中介绍了,如何使用Apache Pig与Lucene集成,还不知道的道友们,可以先看下上篇,熟悉下具体的流程。 
在与Lucene集成过程中,我们发现最终还要把生成的Lucene索引,拷贝至本地磁盘,才能提供检索服务,这样以来,比较繁琐,而且有以下几个缺点: 

(一)在生成索引以及最终能提供正常的服务之前,索引经过多次落地操作,这无疑会给磁盘和网络IO,带来巨大影响 

(二)Lucene的Field的配置与其UDF函数的代码耦合性过强,而且提供的配置也比较简单,不太容易满足,灵活多变的检索需求和服务,如果改动索引配置,则有可能需要重新编译源码。 

(三)对Hadoop的分布式存储系统HDFS依赖过强,如果使用与Lucene集成,那么则意味着你提供检索的Web服务器,则必须跟hadoop的存储节点在一个机器上,否则,无法从HDFS上下拉索引,除非你自己写程序,或使用scp再次从目标机传输,这样无疑又增加了,系统的复杂性。 


鉴于有以上几个缺点,所以建议大家使用Solr或ElasticSearch这样的封装了Lucene更高级的API框架,那么Solr与ElasticSearch和Lucene相比,又有什么优点呢? 

(1)在最终的写入数据时,我们可以直接最终结果写入solr或es,同时也可以在HDFS上保存一份,作为灾备。 

(2)使用了solr或es,这时,我们字段的配置完全与UDF函数代码无关,我们的任何字段配置的变动,都不会影响Pig的UDF函数的代码,而在UDF函数里,唯一要做的,就是将最终数据,提供给solr和es服务。 

(3)solr和es都提供了restful风格的http操作方式,这时候,我们的检索集群完全可以与Hadoop集群分离,从而让他们各自都专注自己的服务。 



下面,散仙就具体说下如何使用Pig和Solr集成? 

(1)依旧访问这个地址下载源码压缩包。 
(2)提取出自己想要的部分,在eclipse工程中,修改定制适合自己环境的的代码(Solr版本是否兼容?hadoop版本是否兼容?,Pig版本是否兼容?)。 
(3)使用ant重新打包成jar 
(4)在pig里,注册相关依赖的jar包,并使用索引存储 



注意,在github下载的压缩里直接提供了对SolrCloud模式的提供,而没有提供,普通模式的函数,散仙在这里稍作修改后,可以支持普通模式的Solr服务,代码如下:


SolrOutputFormat函数 

Java代码 

  1. package com.pig.support.solr;  

  2.   

  3.   

  4.   

  5. import java.io.IOException;  

  6. import java.util.ArrayList;  

  7. import java.util.List;  

  8. import java.util.concurrent.Executors;  

  9. import java.util.concurrent.ScheduledExecutorService;  

  10. import java.util.concurrent.TimeUnit;  

  11.   

  12. import org.apache.hadoop.io.Writable;  

  13. import org.apache.hadoop.mapreduce.JobContext;  

  14. import org.apache.hadoop.mapreduce.OutputCommitter;  

  15. import org.apache.hadoop.mapreduce.RecordWriter;  

  16. import org.apache.hadoop.mapreduce.TaskAttemptContext;  

  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

  18. import org.apache.solr.client.solrj.SolrServer;  

  19. import org.apache.solr.client.solrj.SolrServerException;  

  20. import org.apache.solr.client.solrj.impl.CloudSolrServer;  

  21. import org.apache.solr.client.solrj.impl.HttpSolrServer;  

  22. import org.apache.solr.common.SolrInputDocument;  

  23. /** 

  24.  * @author qindongliang 

  25.  * 支持SOlr的SolrOutputFormat 

  26.  * 如果你想了解,或学习更多这方面的 

  27.  * 知识,请加入我们的群: 

  28.  *  

  29.  * 搜索技术交流群(2000人):324714439  

  30.  * 大数据技术1号交流群(2000人):376932160  (已满) 

  31.  * 大数据技术2号交流群(2000人):415886155  

  32.  * 微信公众号:我是攻城师(woshigcs) 

  33.  *  

  34.  * */  

  35. public class SolrOutputFormat extends  

  36.         FileOutputFormat {  

  37.   

  38.     final String address;  

  39.     final String collection;  

  40.   

  41.     public SolrOutputFormat(String address, String collection) {  

  42.         this.address = address;  

  43.         this.collection = collection;  

  44.     }  

  45.   

  46.     @Override  

  47.     public RecordWriter getRecordWriter(  

  48.             TaskAttemptContext ctx) throws IOException, InterruptedException {  

  49.         return new SolrRecordWriter(ctx, address, collection);  

  50.     }  

  51.   

  52.       

  53.     @Override  

  54.     public synchronized OutputCommitter getOutputCommitter(  

  55.             TaskAttemptContext arg0) throws IOException {  

  56.         return new OutputCommitter(){  

  57.   

  58.             @Override  

  59.             public void abortTask(TaskAttemptContext ctx) throws IOException {  

  60.                   

  61.             }  

  62.   

  63.             @Override  

  64.             public void commitTask(TaskAttemptContext ctx) throws IOException {  

  65.                   

  66.             }  

  67.   

  68.             @Override  

  69.             public boolean needsTaskCommit(TaskAttemptContext arg0)  

  70.                     throws IOException {  

  71.                 return true;  

  72.             }  

  73.   

  74.             @Override  

  75.             public void setupJob(JobContext ctx) throws IOException {  

  76.                   

  77.             }  

  78.   

  79.             @Override  

  80.             public void setupTask(TaskAttemptContext ctx) throws IOException {  

  81.                   

  82.             }  

  83.               

  84.               

  85.         };  

  86.     }  

  87.   

  88.   

  89.     /** 

  90.      * Write out the LuceneIndex to a local temporary location.
     

  91.      * On commit/close the index is copied to the hdfs output directory.
     

  92.      *  

  93.      */  

  94.     static class SolrRecordWriter extends RecordWriter {  

  95.         /**Solr的地址*/  

  96.         SolrServer server;  

  97.         /**批处理提交的数量**/  

  98.         int batch = 5000;  

  99.           

  100.         TaskAttemptContext ctx;  

  101.           

  102.         List docs = new ArrayList(batch);  

  103.         ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();  

  104.         /** 

  105.          * Opens and forces connect to CloudSolrServer 

  106.          *  

  107.          * @param address 

  108.          */  

  109.         public SolrRecordWriter(final TaskAttemptContext ctx, String address, String collection) {  

  110.             try {  

  111.                 this.ctx = ctx;  

  112.                 server = new HttpSolrServer(address);  

  113.                   

  114.                 exec.scheduleWithFixedDelay(new Runnable(){  

  115.                     public void run(){  

  116.                         ctx.progress();  

  117.                     }  

  118.                 }, 10001000, TimeUnit.MILLISECONDS);  

  119.             } catch (Exception e) {  

  120.                 RuntimeException exc = new RuntimeException(e.toString(), e);  

  121.                 exc.setStackTrace(e.getStackTrace());  

  122.                 throw exc;  

  123.             }  

  124.         }  

  125.   

  126.           

  127.         /** 

  128.          * On close we commit 

  129.          */  

  130.         @Override  

  131.         public void close(final TaskAttemptContext ctx) throws IOException,  

  132.                 InterruptedException {  

  133.   

  134.             try {  

  135.                   

  136.                 if (docs.size() > 0) {  

  137.                     server.add(docs);  

  138.                     docs.clear();  

  139.                 }  

  140.   

  141.                 server.commit();  

  142.             } catch (SolrServerException e) {  

  143.                 RuntimeException exc = new RuntimeException(e.toString(), e);  

  144.                 exc.setStackTrace(e.getStackTrace());  

  145.                 throw exc;  

  146.             } finally {  

  147.                 server.shutdown();  

  148.                 exec.shutdownNow();  

  149.             }  

  150.               

  151.         }  

  152.   

  153.         /** 

  154.          * We add the indexed documents without commit 

  155.          */  

  156.         @Override  

  157.         public void write(Writable key, SolrInputDocument doc)  

  158.                 throws IOException, InterruptedException {  

  159.             try {  

  160.                 docs.add(doc);  

  161.                 if (docs.size() >= batch) {  

  162.                     server.add(docs);  

  163.                     docs.clear();  

  164.                 }  

  165.             } catch (SolrServerException e) {  

  166.                 RuntimeException exc = new RuntimeException(e.toString(), e);  

  167.                 exc.setStackTrace(e.getStackTrace());  

  168.                 throw exc;  

  169.             }  

  170.         }  

  171.   

  172.     }  

  173. }  





SolrStore函数 

  1. package com.pig.support.solr;  

  2.   

  3.   

  4.   

  5. import java.io.IOException;  

  6. import java.util.Properties;  

  7.   

  8. import org.apache.hadoop.fs.Path;  

  9. import org.apache.hadoop.io.Writable;  

  10. import org.apache.hadoop.mapreduce.Job;  

  11. import org.apache.hadoop.mapreduce.OutputFormat;  

  12. import org.apache.hadoop.mapreduce.RecordWriter;  

  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

  14. import org.apache.pig.ResourceSchema;  

  15. import org.apache.pig.ResourceSchema.ResourceFieldSchema;  

  16. import org.apache.pig.ResourceStatistics;  

  17. import org.apache.pig.StoreFunc;  

  18. import org.apache.pig.StoreMetadata;  

  19. import org.apache.pig.data.Tuple;  

  20. import org.apache.pig.impl.util.UDFContext;  

  21. import org.apache.pig.impl.util.Utils;  

  22. import org.apache.solr.common.SolrInputDocument;  

  23.   

  24. /** 

  25.  *  

  26.  * Create a lucene index 

  27.  *  

  28.  */  

  29. public class SolrStore extends StoreFunc implements StoreMetadata {  

  30.   

  31.     private static final String SCHEMA_SIGNATURE = "solr.output.schema";  

  32.   

  33.     ResourceSchema schema;  

  34.     String udfSignature;  

  35.     RecordWriter writer;  

  36.   

  37.     String address;  

  38.     String collection;  

  39.       

  40.     public SolrStore(String address, String collection) {  

  41.         this.address = address;  

  42.         this.collection = collection;  

  43.     }  

  44.   

  45.     public void storeStatistics(ResourceStatistics stats, String location,  

  46.             Job job) throws IOException {  

  47.     }  

  48.   

  49.     public void storeSchema(ResourceSchema schema, String location, Job job)  

  50.             throws IOException {  

  51.     }  

  52.   

  53.     @Override  

  54.     public void checkSchema(ResourceSchema s) throws IOException {  

  55.         UDFContext udfc = UDFContext.getUDFContext();  

  56.         Properties p = udfc.getUDFProperties(this.getClass(),  

  57.                 new String[] { udfSignature });  

  58.         p.setProperty(SCHEMA_SIGNATURE, s.toString());  

  59.     }  

  60.   

  61.     public OutputFormat getOutputFormat()  

  62.             throws IOException {  

  63.         // not be used  

  64.         return new SolrOutputFormat(address, collection);  

  65.     }  

  66.   

  67.     /** 

  68.      * Not used 

  69.      */  

  70.     @Override  

  71.     public void setStoreLocation(String location, Job job) throws IOException {  

  72.         FileOutputFormat.setOutputPath(job, new Path(location));  

  73.     }  

  74.   

  75.     @Override  

  76.     public void setStoreFuncUDFContextSignature(String signature) {  

  77.         this.udfSignature = signature;  

  78.     }  

  79.   

  80.     @SuppressWarnings({ "unchecked""rawtypes" })  

  81.     @Override  

  82.     public void prepareToWrite(RecordWriter writer) throws IOException {  

  83.         this.writer = writer;  

  84.         UDFContext udc = UDFContext.getUDFContext();  

  85.         String schemaStr = udc.getUDFProperties(this.getClass(),  

  86.                 new String[] { udfSignature }).getProperty(SCHEMA_SIGNATURE);  

  87.   

  88.         if (schemaStr == null) {  

  89.             throw new RuntimeException("Could not find udf signature");  

  90.         }  

  91.   

  92.         schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));  

  93.   

  94.     }  

  95.   

  96.     /** 

  97.      * Shamelessly copied from : https://issues.apache.org/jira/secure/attachment/12484764/NUTCH-1016-2.0.patch 

  98.      * @param input 

  99.      * @return 

  100.      */  

  101.     private static String stripNonCharCodepoints(String input) {  

  102.         StringBuilder retval = new StringBuilder(input.length());  

  103.         char ch;  

  104.   

  105.         for (int i = 0; i < input.length(); i++) {  

  106.             ch = input.charAt(i);  

  107.   

  108.             // Strip all non-characters  

  109.             // http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]  

  110.             // and non-printable control characters except tabulator, new line  

  111.             // and carriage return  

  112.             if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step  

  113.                                             // 0x10000  

  114.                     ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range  

  115.                     (ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef  

  116.                     (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {  

  117.   

  118.                 retval.append(ch);  

  119.             }  

  120.         }  

  121.   

  122.         return retval.toString();  

  123.     }  

  124.   

  125.     @Override  

  126.     public void putNext(Tuple t) throws IOException {  

  127.   

  128.         final SolrInputDocument doc = new SolrInputDocument();  

  129.   

  130.         final ResourceFieldSchema[] fields = schema.getFields();  

  131.         int docfields = 0;  

  132.   

  133.         for (int i = 0; i < fields.length; i++) {  

  134.             final Object value = t.get(i);  

  135.   

  136.             if (value != null) {  

  137.                 docfields++;  

  138.                 doc.addField(fields[i].getName().trim(), stripNonCharCodepoints(value.toString()));  

  139.             }  

  140.   

  141.         }  

  142.   

  143.         try {  

  144.             if (docfields > 0)  

  145.                 writer.write(null, doc);  

  146.         } catch (InterruptedException e) {  

  147.             Thread.currentThread().interrupt();  

  148.             return;  

  149.         }  

  150.   

  151.     }  

  152.   

  153. }  



Pig脚本如下: 

  1. --注册依赖文件的jar包  

  2. REGISTER ./dependfiles/tools.jar;  

  3.   

  4. --注册solr相关的jar包  

  5. REGISTER  ./solrdependfiles/pigudf.jar;   

  6. REGISTER  ./solrdependfiles/solr-core-4.10.2.jar;  

  7. REGISTER  ./solrdependfiles/solr-solrj-4.10.2.jar;  

  8. REGISTER  ./solrdependfiles/httpclient-4.3.1.jar  

  9. REGISTER  ./solrdependfiles/httpcore-4.3.jar  

  10. REGISTER  ./solrdependfiles/httpmime-4.3.1.jar  

  11. REGISTER  ./solrdependfiles/noggit-0.5.jar  

  12.   

  13.   

  14. --加载HDFS数据,并定义scheaml  

  15. a = load ‘/tmp/data‘ using PigStorage(‘,‘) as (sword:chararray,scount:int);  

  16.   

  17. --存储到solr中,并提供solr的ip地址和端口号  

  18. store d into ‘/user/search/solrindextemp‘  using com.pig.support.solr.SolrStore(‘http://localhost:8983/solr/collection1‘,‘collection1‘);  

  19. ~                                                                                                                                                              

  20. ~                                                                        

  21. ~                                 



配置成功之后,我们就可以运行程序,加载HDFS上数据,经过计算处理之后,并将最终的结果,存储到Solr之中,截图如下: 


技术分享



成功之后,我们就可以很方便的在solr中进行毫秒级别的操作了,例如各种各样的全文查询,过滤,排序统计等等! 

同样的方式,我们也可以将索引存储在ElasticSearch中,关于如何使用Pig和ElasticSearch集成,散仙也会在后面的文章中介绍,敬请期待! 

玩转大数据系列之Apache Pig如何与Apache Solr集成(二)


推荐阅读
  • Java验证码——kaptcha的使用配置及样式
    本文介绍了如何使用kaptcha库来实现Java验证码的配置和样式设置,包括pom.xml的依赖配置和web.xml中servlet的配置。 ... [详细]
  • 知识图谱——机器大脑中的知识库
    本文介绍了知识图谱在机器大脑中的应用,以及搜索引擎在知识图谱方面的发展。以谷歌知识图谱为例,说明了知识图谱的智能化特点。通过搜索引擎用户可以获取更加智能化的答案,如搜索关键词"Marie Curie",会得到居里夫人的详细信息以及与之相关的历史人物。知识图谱的出现引起了搜索引擎行业的变革,不仅美国的微软必应,中国的百度、搜狗等搜索引擎公司也纷纷推出了自己的知识图谱。 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 基于layUI的图片上传前预览功能的2种实现方式
    本文介绍了基于layUI的图片上传前预览功能的两种实现方式:一种是使用blob+FileReader,另一种是使用layUI自带的参数。通过选择文件后点击文件名,在页面中间弹窗内预览图片。其中,layUI自带的参数实现了图片预览功能。该功能依赖于layUI的上传模块,并使用了blob和FileReader来读取本地文件并获取图像的base64编码。点击文件名时会执行See()函数。摘要长度为169字。 ... [详细]
  • 本文介绍了使用AJAX的POST请求实现数据修改功能的方法。通过ajax-post技术,可以实现在输入某个id后,通过ajax技术调用post.jsp修改具有该id记录的姓名的值。文章还提到了AJAX的概念和作用,以及使用async参数和open()方法的注意事项。同时强调了不推荐使用async=false的情况,并解释了JavaScript等待服务器响应的机制。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 在project.properties添加#Projecttarget.targetandroid-19android.library.reference.1..Sliding ... [详细]
  • 猜字母游戏
    猜字母游戏猜字母游戏——设计数据结构猜字母游戏——设计程序结构猜字母游戏——实现字母生成方法猜字母游戏——实现字母检测方法猜字母游戏——实现主方法1猜字母游戏——设计数据结构1.1 ... [详细]
  • CentOS 7部署KVM虚拟化环境之一架构介绍
    本文介绍了CentOS 7部署KVM虚拟化环境的架构,详细解释了虚拟化技术的概念和原理,包括全虚拟化和半虚拟化。同时介绍了虚拟机的概念和虚拟化软件的作用。 ... [详细]
  • 本文介绍了一种解析GRE报文长度的方法,通过分析GRE报文头中的标志位来计算报文长度。具体实现步骤包括获取GRE报文头指针、提取标志位、计算报文长度等。该方法可以帮助用户准确地获取GRE报文的长度信息。 ... [详细]
  • PDF内容编辑的两种小方法,你知道怎么操作吗?
    本文介绍了两种PDF内容编辑的方法:迅捷PDF编辑器和Adobe Acrobat DC。使用迅捷PDF编辑器,用户可以通过选择需要更改的文字内容并设置字体形式、大小和颜色来编辑PDF文件。而使用Adobe Acrobat DC,则可以通过在软件中点击编辑来编辑PDF文件。PDF文件的编辑可以帮助办公人员进行文件内容的修改和定制。 ... [详细]
  • CentOS 6.5安装VMware Tools及共享文件夹显示问题解决方法
    本文介绍了在CentOS 6.5上安装VMware Tools及解决共享文件夹显示问题的方法。包括清空CD/DVD使用的ISO镜像文件、创建挂载目录、改变光驱设备的读写权限等步骤。最后给出了拷贝解压VMware Tools的操作。 ... [详细]
author-avatar
木木的亦尘_283
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有