ElasticSearch是一个基于Lucene的搜索服务器。它和Lucene之间的关系在下文的介绍中会通过流图来详细分析的。现在只要知道,ES(ElasticSearch)是基于Lucene开发的一个搜索服务器。如果就那么简单的话,ES就不会像现在那么流行了,它还提供了一个分布式多用户能力的全文搜索引擎。全文搜索这是Lucene本身就具备的,而分布式这个东西就非常有用了。像现在的大数据平台Hadoop,Spark基本都是分布式的,分布式计算的好处在这里就不细讲了。有了ES就能和这些大数据平台无缝衔接,这也是ES流行的原因之一吧。
此外,ElasticSearch提供了RESTful web的接口,这让ES非常容易的连接到很多平台之上,我们只需要通过与服务器端口之间的数据交互就能轻易地使用ES。ES是基于java开发,并提供python,PHP,go等多语言的api。本文对ES的开发是完全基于python完成。
可能你没有使用过ES,或者第一次接触ES,但是你一定使用过ES的服务,现在很多流行的应用都是基于ES开发的,比如github,***,***,抖音中的一些搜索服务。类似的其实还有更多,并不是每家公司有能力开发出一套像百度,谷歌这样完整的搜索引擎。
这一部分才是我学习ES最关心的地方,不要花里胡哨的说那么多,你到底能做什么呐?(show me your ability)。
上图很清晰的介绍了ElasticSearch和Lucene之间的关系,作为开发者,我们只需要将数据和ES与用户交互部分完成即可。其中数据可以来自两部分,一部分是通过爬虫实时爬取,那就可以真正的实现一个搜索引擎了,另外可以将本地数据导入到ES中。
按照上图的分析,使用ElasticSearch,第一步就是需要建立ES能用的数据,ES其实就是一个面向文档型的数据库,一条数据在ES中就是一个文档,用json的格式对文档进行序列化。比如一条用户的数据如下所示:
{
"name" : "fangfang",
"sex" : "Female",
"age" : 21,
"birthDate": "1998/05/01",
"about" : "I love sing,dance,basketball and hip",
"interests": [ "basketball", "music" ]
}
用关系型数据库的话,最直接的想法就是建立一个User表,然后建立相应的字段等。
在ES中这就是一个文档,当然这个文档会属于一个User的类型,各种各样的类型存在于一个索引当中。这里有一份简易的将Elasticsearch和关系型数据术语对照表:
关系数据库 | 数据库 | 表 | 行 | 列 |
---|---|---|---|---|
ES | 索引(index) | 类型(type) | 文档 | 字段(fields) |
一目了然吧,一个ES集群中可以包含多个索引(数据库),一个索引也可以对应了多个类型(表),这些类型中包含了很多的文档(行),然后每个文档中又包含了很多的字段(列)。
现在清楚了吧,要想使用ES,首先我们首先需要json格式的数据。有了json数据之后,怎么把这些数据存储到ES中呐?
这篇文章不会详细讲解索引是什么?以及ES为什么要用索引的方式来存储数据,现在只要知道,这一切都是为了搜索服务的,也就是说,这一切都是为了提高搜索的性能。
#coding:utf-8
import os
import time
from os import walk
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch import helpers
from datetime import datetime
import json
class ElasticObj:
def __init__(self,index_name,index_type,ip="127.0.0.1"):
"""
param
index_name:索引名称
index_type:索引类型
"""
self.index_name = index_name
self.index_type = index_type
self.es = Elasticsearch([ip],http_auth=('elastic','password'),port = 9200)
def set_index(self,index_name):
"""
索引设置
"""
_index_settings={
"settings":{
"number_of_shards": "6",
"number_of_replicas": "1",
}
}
if self.es.indices.exists(index = self.index_name) is not True:
res = self.es.indices.create(index = self.index_name,body = _index_settings)
print(res)
def create_index(self,index_name='user_test1',index_type='user_type_test1'):
"""
创建索引,索引名称为user,索引类型为user_type
"""
#创建映射
_index_mappings={
"properties": {
"user_name": {
"type": "text",
"analyzer":"ik_max_word",
"search_analyzer":"ik_max_word"
}
}
}
if self.es.indices.exists(index = self.index_name) is True:
res = self.es.indices.put_mapping(doc_type=index_type,index=self.index_name,body=_index_mappings)
print(res)
def index_data(self):
"""
存储样例数据到es
"""
index_list = [
{
"user_name":"支付宝"
},
{
"user_name":"支付宝天猫"
},
{
"user_name":"支付宝淘宝"
},
{
"user_name":"百度"
},
{
"user_name":"百度金融"
},
{
"user_name":"百度小满"
}
]
id =1;
for item in index_list:
res = self.es.index(index=self.index_name,doc_type=self.index_type,body=item,id = id)
id+=1
print(res)
def bulk_index_data(self,user_path='D:/rookieproject/user_name/user-Names.txt'):
"""
批量建立索引数据
user_path:对应的user_name文件
return:
"""
#result = {}
lines=[]
with open(user_path,'r',encoding='UTF-8') as f:
for line in f.readlines():
#results['user_name']=line.strip()
#lines.append({"user_name":line.strip()})
lines.append({"user_name":line.strip()})
actiOns= []
i = 1
for line in lines:
action = {
"_index":self.index_name,
"_type":self.index_type,
"_id":i,
"_source":{
"user_name":line['user_name']
}
}
i+=1
actions.append(action)
sucess,_ = bulk(self.es,actions,index=self.index_name,raise_on_error = True,request_timeout=100)
print(sucess)
def delete_index_data(self,id):
"""
删除索引中的一条数据
param id
return :
"""
res = self.es.delete(index = self.index_name,doc_type=self.index_type,id = id)
#删除结果
print(res)
def get_data_id(self,id):
res = self.es.get(index = self.index_name,doc_type = self.index_type,id = id)
print(res['_source'])
print("-----------------------------------------------")
print(res)
#for hit in res['hits']:
# print(hit['_source']['user_name'],hit['_source']['user_name'])
def get_data_by_query(self,query_term="支付宝"):
result = {}
ans_li=[]
score_li=[]
ress =""
doc = {
"query":{
"match":{
"user_name":{
"query":query_term,
"analyzer":"ik_max_word"
}
}
},
"highlight": {
"fields": {
"user_name":{}
}
}
}
_searched = self.es.search(index = self.index_name,doc_type = self.index_type,body = doc,search_type='query_then_fetch')
print(_searched)
for hit in _searched['hits']['hits']:
ress=ress+hit['highlight']['user_name'][0]+"
"
ans_li.append(hit['_source']['user_name'])
score_li.append(hit['_score'])
print(hit['_source']['user_name'],hit['_score'])
print("===============================================")
result['ans'] = ans_li
result['score'] = score_li
return result,ress
if __name__=="__main__":
obj = ElasticObj("user_ik_base","ik_base_type")
#obj = ElasticObj("user","user_type")
#obj.set_index("user_ik_base")
#obj.create_index("user_ik_base","ik_base_type")
#obj.index_data()
#obj.get_data_id(1)
#obj.bulk_index_data()
for i in range(100):
print("输入公司名称:")
user_name = input()
#print("搜索结果:")
print("===============================================")
obj.get_data_by_query(user_name)
#obj.bulk_index_data()
先上代码,通过python与ES实现交互,以上代码需要安装python库elasticsearch,下载与你安装的对应的ES版本即可。
当然还需要安装elasticsearch
按照下面的步骤建立索引:
def set_index(self,index_name):
"""
索引设置
"""
_index_settings={
"settings":{
"number_of_shards": "6",
"number_of_replicas": "1",
}
}
if self.es.indices.exists(index = self.index_name) is not True:
res = self.es.indices.create(index = self.index_name,body = _index_settings)
print(res)
def create_index(self,index_name='user_test1',index_type='user_type_test1'):
"""
创建索引,索引名称为user,索引类型为user_type
"""
#创建映射
_index_mappings={
"properties": {
"user_name": {
"type": "text",
"analyzer":"ik_max_word",
"search_analyzer":"ik_max_word"
}
}
}
if self.es.indices.exists(index = self.index_name) is True:
res = self.es.indices.put_mapping(doc_type=index_type,index=self.index_name,body=_index_mappings)
print(res)
def index_data(self):
"""
存储样例数据到es
"""
index_list = [
{
"user_name":"支付宝"
},
{
"user_name":"支付宝天猫"
},
{
"user_name":"支付宝淘宝"
},
{
"user_name":"百度"
},
{
"user_name":"百度金融"
},
{
"user_name":"百度小满"
}
]
id =1;
for item in index_list:
res = self.es.index(index=self.index_name,doc_type=self.index_type,body=item,id = id)
id+=1
print(res)
批量导入数据对应的代码:
def bulk_index_data(self,user_path='D:/rookieproject/user_name/user-Names-Corpus.txt'):
"""
批量建立索引数据
user_path:user_name对应的文件
return:
"""
#result = {}
lines=[]
with open(user_path,'r',encoding='UTF-8') as f:
for line in f.readlines():
#results['user_name']=line.strip()
#lines.append({"user_name":line.strip()})
lines.append({"user_name":line.strip()})
actiOns= []
i = 1
for line in lines:
action = {
"_index":self.index_name,
"_type":self.index_type,
"_id":i,
"_source":{
"user_name":line['user_name']
}
}
i+=1
actions.append(action)
sucess,_ = bulk(self.es,actions,index=self.index_name,raise_on_error = True,request_timeout=100)
print(sucess)
搜索对应的代码:
def get_data_by_query(self,query_term="支付宝"):
result = {}
ans_li=[]
score_li=[]
ress =""
doc = {
"query":{
"match":{
"user_name":{
"query":query_term,
"analyzer":"ik_max_word"
}
}
},
"highlight": {
"fields": {
"user_name":{}
}
}
}
_searched = self.es.search(index = self.index_name,doc_type = self.index_type,body = doc,search_type='query_then_fetch')
print(_searched)
for hit in _searched['hits']['hits']:
ress=ress+hit['highlight']['user_name'][0]+"
"
ans_li.append(hit['_source']['user_name'])
score_li.append(hit['_score'])
print(hit['_source']['user_name'],hit['_score'])
print("===============================================")
result['ans'] = ans_li
result['score'] = score_li
return result,ress
本文介绍了ES的基本原理,其中索引,类型,文档和字段和关系型数据库相对应。按照以上的步骤可以快速建立一个针对user_name的全文搜索,本文采用了ik分词器,针对中文分词器的介绍在以后会介绍。
本文针对ES的操作全部通过python api来完成,可以通过bulk接口快速建立大批量文档的索引。
参考:
[1]: https://www.cnblogs.com/sha0830/p/8000242.html
[2]: https://www.elastic.co/cn/products/elasticsearch