谈到es的中文分词器,肯定少不了ik分词器.现ik分词器有两种获取主词汇和停用词的方法:
一是通过ik\config目录下的main.dic和stopword.dic获取,但是每次修改后要重启才能生效
二是通过提供接口返回所有词汇的接口,接口路径配置在.但是该方式每次都需要将所有词汇返回,效率不高.
本次目的就是通过jdbc直接连接数据库来实现增量更新词汇.我们要做的就是找到添加主词汇和停用词汇的方法,然后再通过jdbc获取数据库词汇来调用该方法来更新词汇
下载ik源码,我下载的是7.17.6本版.因为es使用的是7.17.7,为防止启动报错,下载后我将版本改成了7.17.7.
(1)找到Dictionary.initial方法
可以看到,加载词汇的过程再Dictionary.initial 方法中,在该方法中,加载了各文件的词汇还有通过定时任务来获取接口词汇进行更新.
(2)接下来我们进入到singleton.loadMainDict -> loadExtDict -> loadDictFile方法中
可以看到dict.fillSegment就是添加主词汇
(3)同理的,如下_stopWords.fillSegment就是对停用词的加载
所以我们要做的就是拿到词汇,调用对应的fillSegment来加载词汇就可以了
(1)表设计
主词汇表:
CREATE TABLE `es_dic_main` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(100) NOT NULL COMMENT '词汇',
`moditime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`ifdel` char(1) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='主词汇'
通用词表:
CREATE TABLE `es_dic_stop` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(100) NOT NULL COMMENT '停用词',
`moditime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`ifdel` char(1) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='停用词'
(2)在/config目录下创建jdbc配置文件jdbc.properties:
jdbc.url=jdbc:mysql://cckg.liulingjie.cn:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai
jdbc.username=账号
jdbc.password=密码
# 主词汇增量查询sql
main.word.sql=SELECT * FROM es_dic_main WHERE moditime >= ?
# 通用词增量查询sql
stop.word.sql=SELECT * FROM es_dic_stop WHERE moditime >= ?
# 执行间隔(秒)
interval=10
(3)pom.xml添加jdbc依赖:
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>8.0.21version>
dependency>
&#xff08;4&#xff09;src/main/assemblies/plugin.xml下添加以下内容打包时包含mysql驱动jar包&#xff1a;
<dependencySet>
<outputDirectory/>
<useProjectArtifact>trueuseProjectArtifact>
<useTransitiveFiltering>trueuseTransitiveFiltering>
<includes>
<include>mysql:mysql-connector-javainclude>
includes>
dependencySet>
大致流程&#xff1a;
主要涉及有两个类&#xff0c;一个是Dictionary&#xff0c;一个是自己创建的类JdbcMonitor。
Dictionary&#xff1a;提供读取配置&#xff0c;加载词汇和启动词汇更新任务。
JdbcMonitor功能&#xff1a;是一个实现了Runner接口的类&#xff0c;通过jdbc读取数据库词汇并调用Dictionary的方法加载词汇
&#xff08;1&#xff09;在Dictionary类中添加以下方法提供对词汇的api
代码&#xff1a;
protected void fillSegmentMain(String word) {
_MainDict.fillSegment(word.trim().toCharArray());
}
protected void disableSegmentMain(String word) {
_MainDict.disableSegment(word.trim().toCharArray());
}
protected void fillSegmentStop(String word) {
_StopWords.fillSegment(word.trim().toCharArray());
}
protected void disableSegmentStop(String word) {
_StopWords.disableSegment(word.trim().toCharArray());
}
&#xff08;2&#xff09;在Dictionary构造方法中读取配置jdbc.properties
代码&#xff1a;
public class JdbcConfig {
private String url;
private String username;
private String password;
private String mainWordSql;
private String stopWordSql;
private Integer interval;
// geter,setter省略
}
private Dictionary(Configuration cfg) {
//......省略
// 读取jdbc配置
setJdbcConfig();
}
private void setJdbcConfig() {
Path file &#61; PathUtils.get(getDictRoot(), Dictionary.PATH_JDBC_CONFIG);
Properties properties &#61; null;
try {
properties &#61; new Properties();
properties.load(new FileInputStream(file.toFile()));
} catch (Exception e) {
logger.error("load jdbc.properties failed");
logger.error(e.getMessage());
}
jdbcConfig &#61; new JdbcConfig(
properties.getProperty("jdbc.url"),
properties.getProperty("jdbc.username"),
properties.getProperty("jdbc.password"),
properties.getProperty("main.word.sql"),
properties.getProperty("stop.word.sql"),
Integer.valueOf(properties.getProperty("interval"))
);
}
&#xff08;3&#xff09;声明JdbcMinitor类定时连接数据库读取并更新词汇
package org.wltea.analyzer.dic;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.SpecialPermission;
import org.wltea.analyzer.cfg.JdbcConfig;
import org.wltea.analyzer.help.ESPluginLoggerFactory;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/**
* &#64;author liulingjie
* &#64;date 2022/11/29 20:36
*/
public class JdbcMonitor implements Runnable {
static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (Exception e) {
e.getStackTrace();
}
}
/**
* jdbc配置
*/
private JdbcConfig jdbcConfig;
/**
* 主词汇上次更新时间
*/
private Timestamp mainLastModitime &#61; Timestamp.valueOf("2022-01-01 00:00:00");
/**
* 停用词上次更新时间
*/
private Timestamp stopLastModitime &#61; Timestamp.valueOf("2022-01-01 00:00:00");
private static final Logger logger &#61; ESPluginLoggerFactory.getLogger(JdbcMonitor.class.getName());
public JdbcMonitor(JdbcConfig jdbcConfig) {
this.jdbcConfig &#61; jdbcConfig;
}
&#64;Override
public void run() {
SpecialPermission.check();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
this.runUnprivileged();
return null;
});
}
/**
* 加载词汇和停用词
*/
public void runUnprivileged() {
//Dictionary.getSingleton().reLoadMainDict();
loadWords();
}
private void loadWords() {
List<String> mainWords &#61; new ArrayList<>();
List<String> delMainWords &#61; new ArrayList<>();
List<String> stopWords &#61; new ArrayList<>();
List<String> delStopWords &#61; new ArrayList<>();
setAllWordList(mainWords, delMainWords, stopWords, delStopWords);
mainWords.forEach(w -> Dictionary.getSingleton().fillSegmentMain(w));
delMainWords.forEach(w -> Dictionary.getSingleton().disableSegmentMain(w));
stopWords.forEach(w -> Dictionary.getSingleton().fillSegmentStop(w));
delStopWords.forEach(w -> Dictionary.getSingleton().disableSegmentStop(w));
logger.info("ik dic refresh from db. mainLastModitime: {} stopLastModitime: {}", mainLastModitime, stopLastModitime);
}
/**
* 获取主词汇和停用词
*
* &#64;param mainWords
* &#64;param delMainWords
* &#64;param stopWords
* &#64;param delStopWords
*/
private void setAllWordList(List<String> mainWords, List<String> delMainWords, List<String> stopWords, List<String> delStopWords) {
Connection connection &#61; null;
try {
connection &#61; DriverManager.getConnection(jdbcConfig.getUrl(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
setWordList(connection, jdbcConfig.getMainWordSql(), mainLastModitime, mainWords, delMainWords);
setWordList(connection, jdbcConfig.getStopWordSql(), stopLastModitime, stopWords, delStopWords);
} catch (SQLException throwables) {
logger.error("jdbc load words failed: mainLastModitime-{} stopLostMOditime-{}", mainLastModitime, stopLastModitime);
logger.error(throwables.getStackTrace());
} finally {
if (connection !&#61; null) {
try {
connection.close();
} catch (SQLException throwables) {
logger.error("failed to close connection");
logger.error(throwables.getMessage());
}
}
}
}
/**
* 连接数据库获取词汇
*
* &#64;param connection
* &#64;param sql
* &#64;param lastModitime
* &#64;param words
* &#64;param delWords
*/
private void setWordList(Connection connection, String sql, Timestamp lastModitime, List<String> words, List<String> delWords) {
PreparedStatement prepareStatement &#61; null;
ResultSet result &#61; null;
try {
prepareStatement &#61; connection.prepareStatement(sql);
prepareStatement.setTimestamp(1, lastModitime);
result &#61; prepareStatement.executeQuery();
while (result.next()) {
String word &#61; result.getString("word");
Timestamp moditime &#61; result.getTimestamp("moditime");
String ifdel &#61; result.getString("ifdel");
if ("1".equals(ifdel)) {
delWords.add(word);
} else {
words.add(word);
}
// 取最大的时间
if (moditime.after(lastModitime)) {
lastModitime.setTime(moditime.getTime());
}
}
} catch (SQLException throwables) {
logger.error("jdbc load words failed: {}", lastModitime);
logger.error(throwables.getMessage());
} finally {
if (result !&#61; null) {
try {
result.close();
} catch (SQLException throwables) {
logger.error("failed to close prepareStatement");
logger.error(throwables.getMessage());
}
}
if (prepareStatement !&#61; null) {
try {
prepareStatement.close();
} catch (SQLException throwables) {
logger.error("failed to close prepareStatement");
logger.error(throwables.getMessage());
}
}
}
}
}
&#xff08;4&#xff09;最后在Dictionary.initial方法中启用该定时任务
代码&#xff1a;
public static synchronized void initial(Configuration cfg) {
if (singleton&#61;&#61; null) {
synchronized (Dictionary.class) {
if (singleton &#61;&#61; null) {
singleton &#61; new Dictionary(cfg);
......
// 开启数据库增量更新
pool.scheduleAtFixedRate(new JdbcMonitor(singleton.jdbcConfig), 10, singleton.jdbcConfig.getInterval(), TimeUnit.SECONDS);
}
}
}
}
&#xff08;5&#xff09;最后mvn cliean package打包&#xff0c;在~\target\releases下会生成如下包
&#xff08;6&#xff09;解压放入到 es安装路径/plugins/ik
重启es就行了