热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

纪录:Solr6.4.2+Flume1.7.0+morphline+kafka集成

    当前大多数企业版hadoop的solr版本都还停留在solr4.x,由于这个版本的solr本身的bug较多,使用起来会出很多奇怪的问题。如部分更新日期字段失败的问题。   

     当前大多数企业版hadoop的solr版本都还停留在solr4.x,由于这个版本的solr本身的bug较多,使用起来会出很多奇怪的问题。如部分更新日期字段失败的问题

     最新的solr版本不仅修复了以前的一些常见bug,还提供了更简便易用的功能,如ManagedSchema替代schema.xml来管理索引的schema。

    由于solr自带的接口和入库工具需要一些定制开发,所以通常用flume来作为数据采集的工具。数据流图如下:

纪录:Solr6.4.2+Flume1.7.0 +morphline+kafka集成

具体见前文:《json数据处理实战:Kafka+Flume+Morphline+Solr+Hue数据组合索引

在Cloudera等企业版hadoop中,Solr和Flume已经集成,并能互通。如果你目前的情况是使用Cloudera企业版,请看上面这篇文章。

然而由于集成的版本跟不上开源社区最新版本,还是很嫌弃的。于是就有了下面的配置最新版本的Solr和Flume互通:

 

1.Solr最新版服务部署及入门:

见solr官网quickstart。

http://lucene.apache.org/solr/quickstart.html

说明:创建Solr集合的部分,不是本章重点,所以这里没有介绍。

另,本例中和前文不同,使用的不是SolrCloud模式,而是单机的Solr。

 

2.Flume最新版部署及入门

下载地址:http://flume.apache.org/download.html

入门介绍:https://cwiki.apache.org//confluence/display/FLUME/Getting+Started

详细配置介绍:http://flume.apache.org/FlumeUserGuide.html

详细配置介绍中,需要关注的是KafkaSource和MorphlineSolrSink。

最终的flume.conf配置为:

kafka2solr.sources = source_from_kafka
kafka2solr.channels = customer_doc_channel
kafka2solr.sinks = solr_sink1

# For each one of the sources, the type is defined  
kafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
kafka2solr.sources.source_from_kafka.channels = customer_doc_channel
kafka2solr.sources.source_from_kafka.batchSize = 100
kafka2solr.sources.source_from_kafka.useFlumeEventFormat=false
kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092
kafka2solr.sources.source_from_kafka.kafka.topics = tablecardLogin
kafka2solr.sources.source_from_kafka.kafka.consumer.group.id = catering_customer_core_070327
kafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=earliest

# Other config values specific to each type of channel(sink or source)  
# can be defined as well  
kafka2solr.channels.customer_doc_channel.type = file
kafka2solr.channels.customer_doc_channel.capacity=10000000
kafka2solr.channels.customer_doc_channel.checkpointDir = /home/arli/data/flume-ng/customer_doc/checkpoint
kafka2solr.channels.customer_doc_channel.dataDirs = /home/arli/data/flume-ng/customer_doc/data

kafka2solr.sinks.solr_sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
kafka2solr.sinks.solr_sink1.channel = customer_doc_channel
kafka2solr.sinks.solr_sink1.batchSize = 5000
kafka2solr.sinks.solr_sink1.batchDurationMillis = 2000
kafka2solr.sinks.solr_sink1.morphlineFile = /home/arli/flume-config/morphlines.conf
kafka2solr.sinks.solr_sink1.morphlineId=morphline1
kafka2solr.sinks.solr_sink1.isIgnoringRecoverableExceptions=true
#kafka2solr.sinks.solr_sink1.isProductionMode=true

 

3.新建一个Flume配置目录,下面四个文件是比较重要的。

纪录:Solr6.4.2+Flume1.7.0 +morphline+kafka集成

flume.conf 来自上一节的配置。

flume-env.sh 来自安装目录conf下的flume-env.sh.template。需要改动。

log4j.properties 在调试过程中可以开启更低级别的日志打印。

morphline.conf 参考Morphline的文档:http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html

 

4.接下来详细介绍上面的后三个配置文件。

1)flume-env.sh

纪录:Solr6.4.2+Flume1.7.0 +morphline+kafka集成

需要改动的地方如上:

#默认的内存是不够的。需要扩大内存。
export JAVA_OPTS="-Xms100m -Xmx500m -Dcom.sun.management.jmxremote"

#Flume官方下载的包少了一些Solr相关的包,需要把solr的lib目录加到flume的classpath下。
FLUME_CLASSPATH="/xxx/solr-6.4.2/contrib/morphlines-core/lib/*:/xxxi/solr-6.4.2/dist/*:/xxx/solr-6.4.2/dist/solrj-lib/*:/xxx/solr-6.4.2/server/solr-webapp/webapp/WEB-INF/lib/*"

 

2)log4j.properties

100MB改成10MB,以防打日志太多日志文件过大。

纪录:Solr6.4.2+Flume1.7.0 +morphline+kafka集成

在调试阶段,加上如下两行会省心很多,调试完再去掉。

log4j.logger.org.apache.flume.sink.solr=DEBUG
log4j.logger.org.kitesdk.morphline=TRACE

 

3)morphline.conf

大部分和前文:《json数据处理实战:Kafka+Flume+Morphline+Solr+Hue数据组合索引》雷同。由于我使用的是单机版本的Solr,所以在配置时如下。

注意solrUrl和solrHomeDir的配置,在官网中没有介绍(因为morphline是cloudera开发并开源的,cloudera的solr默认是solrCloud),但是在源码阅读时可以看到这两个单机solr配置参数。

SOLR_LOCATOR : {
   solrUrl : "http:\/\/localhost:8983\/solr\/catering_customer_core1"
   solrHomeDir : "/xxx/server/solr/catering_customer_core1/conf"
}

morphlines : [
  {
    #customer morphline
    id : morphline1
    
    # Import all morphline commands in these java packages and their subpackages.
    # Other commands that may be present on the classpath are not visible to this morphline.
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
    
    commands : [                 
    { 
        readJson {}
    } 
    
    {
        tryRules 
        {
            catchExceptions : false
            throwExceptionIfAllRulesFailed : true
            rules : [
            {
              commands : [
                { 
                    contains {topic : [tablecardLogin] } 
                }
                
                #field need to be indexed from json.
                {
                    extractJsonPaths {
                      flatten : false
                      paths : {
                        account:/account         
                        customer_id:/customerId
                        history_signin_dates:/opt_time
                        history_signin_timestamps:/opt_time
                        name:/name
                        sex:/sex
                        }
                    }
                }
              ]
              
            }
            

            # if desired, the last rule can serve as a fallback mechanism 
            # for records that don't match any rule:
            {
                    commands : [
                    { logWarn { format : "Ignoring record with unsupported input format: {}", args : ["@{}"] } }
                    { dropRecord {} }    
                    ]
            }
          ]
        }
    }

    {
        convertTimestamp {
            field : history_signin_dates
            inputFormats : ["yyyy-MM-dd HH:mm:ss"]
            inputTimezone : Asia/Shanghai
            outputFormat : "yyyy-MM-dd'T'HH:mm:ss'Z/SECOND'"
            outputTimezone : Asia/Shanghai
            }
    }

    {
        convertTimestamp {
            field : history_signin_timestamps
            inputFormats : ["yyyy-MM-dd HH:mm:ss"]
            inputTimezone : Asia/Shanghai
            outputFormat : "unixTimeInMillis"
            outputTimezone : UTC
            }
    }

    {
        java {
            imports : "import java.util.*;import org.kitesdk.morphline.api.Command;import org.kitesdk.morphline.api.Record;"
            code:     """
                            Object customerId = record.getFirstValue("customer_id");
                            Object account = record.getFirstValue("account");
                            record.put("id", account + "@" + customerId);
                            return child.process(record);
                    """
            }
    }
    
    {sanitizeUnknownSolrFields {solrLocator : ${SOLR_LOCATOR}}}

    #将数据导入到solr中
    {loadSolr {solrLocator : ${SOLR_LOCATOR}}}

     ]
  }
]

 

4.Morphline中的sanitizeUnknownSolrFields命令需要有schema.xml才能使用。

Solr6.4.2的schema默认是用managed-schema文件管理的。如果上面配置中的solrHomeDir目录下没有shema.xml文件,则会报错。

好在managed-schema和之前schema.xml文件内容几乎一致。执行如下命令即可。

cp managed-schema schema.xml

 

5.解决Flume1.7.0和solr6.4.2的jar包冲突问题。

Flume1.7在编译时使用的是Solr4.10.1的包,而其中lib目录下,Solrj依赖的httpcore-4.1.3包已与最新的Solrj不兼容,因此在solr目录dist/solrj-lib下找到对应的包然后替换。

纪录:Solr6.4.2+Flume1.7.0 +morphline+kafka集成

另外还需要清理的两种包:1.Flume的lib目录老的solr版本相关的包,2.若存在kite-morphline-solr-core(因为solr自己发布的版本已经包含了等价的solr-morphline-core包)则需要清理。(由于本文在写作时相应的包都已经清理了,所以记录的不够细节,望见谅。)

 

6.启动flume。调试时可以先在控制台启动,去掉最后的&。

bin/flume-ng agent --conf ~/flume-config/ -f ~/flume-config/flume.conf  -n kafka2solr &

推荐阅读
  • EPICS Archiver Appliance存储waveform记录的尝试及资源需求分析
    本文介绍了EPICS Archiver Appliance存储waveform记录的尝试过程,并分析了其所需的资源容量。通过解决错误提示和调整内存大小,成功存储了波形数据。然后,讨论了储存环逐束团信号的意义,以及通过记录多圈的束团信号进行参数分析的可能性。波形数据的存储需求巨大,每天需要近250G,一年需要90T。然而,储存环逐束团信号具有重要意义,可以揭示出每个束团的纵向振荡频率和模式。 ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • 本文详细介绍了MySQL表分区的创建、增加和删除方法,包括查看分区数据量和全库数据量的方法。欢迎大家阅读并给予点评。 ... [详细]
  • 开发笔记:使用JavaScript解决网页图片拉伸问题
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了使用JavaScript解决网页图片拉伸问题相关的知识,希望对你有一定的参考价值。 ... [详细]
  • Windows下配置PHP5.6的方法及注意事项
    本文介绍了在Windows系统下配置PHP5.6的步骤及注意事项,包括下载PHP5.6、解压并配置IIS、添加模块映射、测试等。同时提供了一些常见问题的解决方法,如下载缺失的msvcr110.dll文件等。通过本文的指导,读者可以轻松地在Windows系统下配置PHP5.6,并解决一些常见的配置问题。 ... [详细]
  • 本文介绍了在Ubuntu系统中清理残余配置文件和无用内容的方法,包括清理残余配置文件、清理下载缓存包、清理不再需要的包、清理无用的语言文件和清理无用的翻译内容。通过这些清理操作可以节省硬盘空间,提高系统的运行效率。 ... [详细]
  • 开发笔记:(002)spring容器中bean初始化销毁时执行的方法及其3种实现方式
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了(002)spring容器中bean初始化销毁时执行的方法及其3种实现方式相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 03Spring使用注解方式注入
    基于注解的DI注入1.导包环境搭建:导入aop包(spring-aop-4.1.6.RELEASE.jar)2.创建类3.创建spring.xml配置文件(必须在src目录下)该配 ... [详细]
  • Spring MVC定制用户登录注销实现示例
    这篇文章描述了如何实现对SpringMVCWeb应用程序的自定义用户访问(登录注销)。作为前提,建议读者阅读这篇文章,其中介 ... [详细]
  • 部署solr建立nutch索引
    2019独角兽企业重金招聘Python工程师标准接着上篇nutch1.4的部署应用,我们来部署一下solr,solr是对lucene进行了封装的企 ... [详细]
  • 原创 | 大数据入门基础系列之ClouderaManager版本的Hive安装部署
    添加服务,一 ... [详细]
  • 一:什么是solrSolr是apache下的一个开源项目,使用Java基于lucene开发的全文搜索服务器;Lucene是一个开放源代 ... [详细]
  • asp.net 有什么框架,有什么技术
    原文地址:http:www.cnblogs.comvirusswbarchive201201102318169.html文章写的很好,转载一些到自己的博 ... [详细]
  • 作者同类文章X转自:http:www.aboutyun.comthread-7949-1-1.html问题导读:1.什么是flume? ... [详细]
  • Nexus3.0.0+Maven的使用(一)
    1、Nexus介绍Nexus是一个强大的Maven仓库管理器,它极大地简化了自己内部仓库的维护和外部仓库的访问。利用Nexus你可以只在一个地方就能够完全控制访问和部署在你所维护仓 ... [详细]
author-avatar
天蝎騎著豬狂奔
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有