Es索引的我们可以理解为数据入库的一个过程。我们知道Es是基于Lucene框架的一个分布式检索平台。索引的同样也是基于Lucene创建的,只不过在其上层做了一些封闭。
Es的索引过程比较通用的大体上有两种方式,其一是得用自身Rvier从数据库中拉数据,当然现在已经有了很多相关插件,Mysql、MDB等数据库。这种方式可以做到近时实索引,因为River是定时从数据库拉数据与索引数据进行比对。这种方式经较适合数据有周期的更新。
下面以Mysql-River plugins为例:
1、 安装Mysql-River 插件
bin/plugin -install /path/to/plugin/river-mysql.zip
2、 当安装好Mysql-River plugin 后,一般可以马上使用,但建立重新加载Es集群。查看log中是否正确的加载了Mysql-River Plugin(在后面我们讲到如何开发相关Plugin)。
3、 配置Es索引与Mysql 数据之间的对应关系。
建立索引(相关Mapping 信息如下:)
curl -XPUT 127.0.0.1:9200/elasticsearchindexname/elasticsearchtypename/_mapping -d
"elasticsearchtypename" : {
"_timestamp":{
"enabled":true
}
}
将River索引的配置也提交到Es集群中:
curl -XPUT 127.0.0.1:9200/_river/river-mysql/_meta –d
{
"type":"mysql",
"mysql":{
"index":"elasticsearchindexname",(索引名称)
"type":"elasticsearchtypename",(类型)
"hostname":"127.0.0.1:3306",(服务器)
"database":"ESDATA",(数据库名称)
"username":"root",(用户名)
"password":"",(密码)
"uniqueIdField":"_ID",(标识)
"query":"select RID,PNAME FROM wf_mds_chn_biaozhun",(SQL语句)
"deleteOldEntries":"false",
"interval":"60000"(更新周期)
}
}
同时你会在Es看到你的索引中开始导数据了,当然些时也会出现一个对应的保存配置的索引,现在很多River都只能索引字段与数据库的字段一一对应。如果需要个性化定制,可以到Github上下载相关代码进行修改。我们可以看到只要继续River(接口)和AbstractRiverComponent(类)便可以进行相关开发了。
public class MysqlRiver extends AbstractRiverComponent implements River
另外一种索引方式当然就是我们把数据Put到Es中去了,最简单的我们可以用下面命令就完成:
$ curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elastic Search"
}'
对上面的命令解释一下:
Twitter:索引名称
Tweet:类型名称
1:ID值
具体我会在下篇中讲解索引名称和类型的关系,当然-d 后面的就是值了。这是单条Put数据的方式虽然简单,但是如果说数据量很大的情况下还是不建议用这种方式,可以改用批量导入的方式也就是传说中的Bluk了,Bluk原量很简单,我们把数据放到缓存中,一次Put多条数据到Es集群中去,Bluk当然要用代码实现了,给出一个例子如下:
public static void Index() throws ElasticSearchException, IOException, NumberFormatException, SQLException {
// TODO Auto-generated method stub
// Node node = nodeBuilder().client(true).node();
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", "elasticsearch_wf").build();
Client client = new TransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(
"168.160.200.250", 9300));
////еܼ¼ֳ5000һѯ
int countRe=100000000; //MySqlClass.getCount("select count(*) from test");
if(countRe>0)
{
int readercount=1;
if(countRe>5000)
{
readercount=countRe%5000==0?countRe/5000:countRe/5000+1;
}
////ÿζȡ5000¼
for(int j=0;j { ResultSet rs = MySqlClass.executeQuery("select * from test"); BulkRequestBuilder bulkRequest = client.prepareBulk(); try { if (rs != null) { int i = 1; while (rs.next()) { bulkRequest.add(client.prepareIndex("qtest", String.valueOf(i++)).setSource( jsonBuilder().startObject() .field("id", rs.getInt("id")) .field("й", rs.getString("title")) .field("AB_EN", rs.getString("descript")) .field("AF_CN",rs.getString("text")) .endObject())); } BulkResponse bulkRespOnse= bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { /* has Failures handler Error */ } } } catch (Exception e) { e.printStackTrace(); } } } client.close(); } 上面只是一个简单的例子,大量可以考虑用从线程方式,另外Client链接数其实还是比较占资源的,大家可以考虑将出封闭到一个链接池中,提供效率。 整个建索引的过程Es在Lucene的基础上还是做了很多的优化,但主体上我们对应到Lucene里面基实就是如下代码: public class Index { private IndexWriter writer = null; private static Analyzer ANALYZER = new IKAnalyzer(); private String FilePath = null; public Index(String FilePath, String IndexPath) { try { IndexWriterConfig writerCOnfig= new IndexWriterConfig( Version.LUCENE_36, ANALYZER); this.writer = new IndexWriter( FSDirectory.open(new File(IndexPath)), writerConfig); this.FilePath = FilePath; } catch (Exception e) { e.printStackTrace(); } } /* * Init Create Index */ public void Init() { try { if (FilePath.length() > 0) { // 读目录中txt文件 File file = new File(FilePath); List this.ListAllFile(file, files); // //将File转换为 Document对象 for (File sfs : files) { this.writer.addDocument(this.getDocument(sfs)); } } } catch (Exception e) { e.printStackTrace(); } } /* * Close Index */ public void Close() { try { this.writer.commit(); this.writer.close(); } catch (Exception e) { e.printStackTrace(); } } /* * 获取所有txt文件 */ private List throws Exception { if (fileOrDir != null && files != null) { if (fileOrDir.isDirectory()) { File[] fs = fileOrDir.listFiles(); for (File sfs : fs) { if (sfs.isDirectory()) this.ListAllFile(sfs, files); else files.add(sfs); } } else { files.add(fileOrDir); } } return null; } /* * Get Document */ private Document getDocument(File f) throws Exception { Document doc = new Document(); FileInputStream is = new FileInputStream(f); byte[] buf = new byte[is.available()]; is.read(buf); String cOntentStr= new String(buf,"GBK"); Field cOntent= new Field("content", contentStr, Field.Store.YES, Field.Index.ANALYZED); doc.add(content); Field path = new Field("path", f.getAbsolutePath(), Field.Store.YES, Field.Index.ANALYZED); Field size=new Field("size",String.valueOf(f.getTotalSpace()),Field.Store.YES,Field.Index.NOT_ANALYZED); doc.add(size); Random rm=new Random(); int year=rm.nextInt(20); Field time=new Field("time",String.valueOf(1990+year),Field.Store.YES,Field.Index.NOT_ANALYZED); doc.add(time); doc.add(path); is.close(); return doc; } }