作者:xin新的 | 来源:互联网 | 2023-02-11 17:21
今天成功运行了法务咨询自动问答的模块。报错过程就不叙述了要想成功运行法务咨询自动问答需要安装elasticsearch数据库,这样才不会报目标计算机积极拒绝,无法连接的错误要安装7
今天成功运行了法务咨询自动问答的模块。
报错过程就不叙述了
要想成功运行法务咨询自动问答需要安装
elasticsearch数据库,这样才不会报目标计算机积极拒绝,无法连接的错误
要安装7.0之前的版本的
elasticsearch数据库,原因我就不说了,好奇的可以运行一下然后通过报错来查询为啥要用7.0版本之前的
下面是代码部分,只要运行一下代码就可以了
首先运行bulid部分:
#!/usr/bin/env python3
# coding: utf-8
# File: insert_es.py
# Author: lhy
# Date: 18-10-10
import os
import time
import json
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
#import pymongo
class ProcessIntoES:
def __init__(self):
self._index = "crime_data"
self.es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}])
self.doc_type = "crime"
cur = '/'.join(os.path.abspath(__file__).split('/')[:-1])
self.music_file = os.path.join(cur, 'data/qa_corpus.json')
'''创建ES索引,确定分词类型'''
def create_mapping(self):
node_mappings = {
"mappings": {
self.doc_type: { # type
"properties": {
"question": { # field: 问题
"type": "text", # lxw NOTE: cannot be string
#"analyzer": "ik_max_word",
#"search_analyzer": "ik_smart",
"index": "true" # The index option controls whether field values are indexed.
},
"answers": { # field: 问题
"type": "text", # lxw NOTE: cannot be string
#"analyzer": "ik_max_word",
#"search_analyzer": "ik_smart",
"index": "true" # The index option controls whether field values are indexed.
},
}
}
}
}
if not self.es.indices.exists(index=self._index):
self.es.indices.create(index=self._index, body=node_mappings)
print("Create {} mapping successfully.".format(self._index))
else:
print("index({}) already exists.".format(self._index))
'''批量插入数据'''
def insert_data_bulk(self, action_list):
success, _ = bulk(self.es, action_list, index=self._index, raise_on_error=True)
print("Performed {0} actions. _: {1}".format(success, _))
'''初始化ES,将数据插入到ES数据库当中'''
def init_ES():
pie = ProcessIntoES()
# 创建ES的index
pie.create_mapping()
start_time = time.time()
index = 0
count = 0
action_list = []
BULK_COUNT = 1000 # 每BULK_COUNT个句子一起插入到ES中
for line in open(pie.music_file,encoding='utf-8'):
if not line:
continue
item = json.loads(line)
index += 1
action = {
"_index": pie._index,
"_type": pie.doc_type,
"_source": {
"question": item['question'],
"answers": '\n'.join(item['answers']),
}
}
action_list.append(action)
if index > BULK_COUNT:
pie.insert_data_bulk(action_list=action_list)
index = 0
count += 1
print(count)
action_list = []
end_time = time.time()
print("Time Cost:{0}".format(end_time - start_time))
if __name__ == "__main__":
# 将数据库插入到elasticsearch当中
init_ES()
# 按照标题进行查询
question = '我老公要起诉离婚 我不想离婚怎么办'
再运行主代码:#!/usr/bin/env python3
# coding: utf-8
# File: crime_qa_server.py
# Author: lhy
# Date: 18-11-10
import os
import time
import json
from elasticsearch import Elasticsearch
import numpy as np
import jieba.posseg as pseg
class CrimeQA:
def __init__(self):
self._index = "crime_data"
self.es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}])
self.doc_type = "crime"
cur = '/'.join(os.path.abspath(__file__).split('/')[:-1])
self.embedding_path = os.path.join(cur, 'embedding/word_vec_300.bin')
self.embdding_dict = self.load_embedding(self.embedding_path)
self.embedding_size = 300
self.min_score = 0.4
self.min_sim = 0.8
'''根据question进行事件的匹配查询'''
def search_specific(self, value, key="question"):
query_body = {
"query": {
"match": {
key: value,
}
}
}
searched = self.es.search(index=self._index, doc_type=self.doc_type, body=query_body, size=20)
# 输出查询到的结果
return searched["hits"]["hits"]
'''基于ES的问题查询'''
def search_es(self, question):
answers = []
res = self.search_specific(question)
for hit in res:
answer_dict = {}
answer_dict['score'] = hit['_score']
answer_dict['sim_question'] = hit['_source']['question']
answer_dict['answers'] = hit['_source']['answers'].split('\n')
answers.append(answer_dict)
return answers
'''加载词向量'''
def load_embedding(self, embedding_path):
embedding_dict = {}
count = 0
for line in open(embedding_path,encoding='utf-8'):
line = line.strip().split(' ')
if len(line) <300:
continue
wd = line[0]
vector = np.array([float(i) for i in line[1:]])
embedding_dict[wd] = vector
count += 1
if count%10000 == 0:
print(count, 'loaded')
print('loaded %s word embedding, finished'%count, )
return embedding_dict
'''对文本进行分词处理'''
def seg_sent(self, s):
wds = [i.word for i in pseg.cut(s) if i.flag[0] not in ['x', 'u', 'c', 'p', 'm', 't']]
return wds
'''基于wordvector,通过lookup table的方式找到句子的wordvector的表示'''
def rep_sentencevector(self, sentence, flag='seg'):
if flag == 'seg':
word_list = [i for i in sentence.split(' ') if i]
else:
word_list = self.seg_sent(sentence)
embedding = np.zeros(self.embedding_size)
sent_len = 0
for index, wd in enumerate(word_list):
if wd in self.embdding_dict:
embedding += self.embdding_dict.get(wd)
sent_len += 1
else:
continue
return embedding/sent_len
'''计算问句与库中问句的相似度,对候选结果加以二次筛选'''
def similarity_cosine(self, vector1, vector2):
cos1 = np.sum(vector1*vector2)
cos21 = np.sqrt(sum(vector1**2))
cos22 = np.sqrt(sum(vector2**2))
similarity = cos1/float(cos21*cos22)
if similarity == 'nan':
return 0
else:
return similarity
'''问答主函数'''
def search_main(self, question):
candi_answers = self.search_es(question)
question_vector = self.rep_sentencevector(question,flag='noseg')
answer_dict = {}
for indx, candi in enumerate(candi_answers):
candi_question = candi['sim_question']
score = candi['score']/100
candi_vector = self.rep_sentencevector(candi_question, flag='noseg')
sim = self.similarity_cosine(question_vector, candi_vector)
if sim continue
final_score = (score + sim)/2
if final_score continue
answer_dict[indx] = final_score
if answer_dict:
answer_dict = sorted(answer_dict.items(), key=lambda asd:asd[1], reverse=True)
final_answer = candi_answers[answer_dict[0][0]]['answers']
else:
final_answer = '您好,对于此类问题,您可以咨询公安部门'
#
# for i in answer_dict:
# answer_indx = i[0]
# score = i[1]
# print(i, score, candi_answers[answer_indx])
# print('******'*6)
return final_answer
if __name__ == "__main__":
handler = CrimeQA()
while(1):
question = input('question:')
final_answer = handler.search_main(question)
print('answers:', final_answer)
最后展示一下成果