SPI概述
SPI即Service Provider Interface,这玩意是JDK内置的一套服务发现机制,该接口允许第三方实现或者扩展,这些被扩展的接口常常用于寻找服务实现。从这方面来讲,有点类似于IOC功能,装配控制权不在程序只动态指明,这种设计达到的最重要的功能就是解耦,尤其适合于模块化设计领域中。所以,在实际的项目开发过程中,通常都将需求抽象成接口,然后对应不同的需求,以不同的方式实现相同的接口。对接口的一种新的实现,不用修改原先的项目代码,只需要写新的实现,打成jar包。SPI就是通过在ClassPath路径下的META-INF/services文件夹查找文件,自动加载文件里所定义的类。当然,很多开源的优秀的项目中,都有SPI的身影,比如Dubbo、Flink等。
之所以写这篇文章,就是因为本人在阅读Flink1.11.1源码时发现,新版本里面的source和sink,改用了SPI机制。详细的设计动机和方案见FLIP-95: New TableSource and TableSink interfaces
https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
案例演示
代码详见我的github:https://github.com/felixzh2020/felixzh-learning-java/tree/master/JDKSpi
简单描述:定义一套消息队列的接口,分别使用kafka和rabbitMQ实现这套接口。
消费接口如下:
public interface MQConsumer {
public void start();
public void stop();
public void subscribe(String topic, MQBusinessHandler topicHandler);
public void unsubscribe(String topic);
}
生产接口如下:
public interface MQProducer {
public void start();
public void stop();
public boolean send(MQMessage> message);
}
业务处理逻辑接口如下:
public interface MQBusinessHandler {
public boolean doBusiness(MQMessage> message);
}
消息体类如下:
public class MQMessage<T> implements Serializable {
private static final long serialVersiOnUID= 4359709211352400087L;
private String id;
private String timeStamp;
private String topic;
private String tag;
private Map headers;
private T content;
public static long getSerialVersionUID() {
return serialVersionUID;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(String timeStamp) {
this.timeStamp = timeStamp;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public Map getHeaders() {
return headers;
}
public void setHeaders(Map headers) {
this.headers = headers;
}
public T getContent() {
return content;
}
public void setContent(T content) {
this.cOntent= content;
}
}
RabbitMQ实现演示如下:
public class RMQProducerImpl implements MQProducer {
@Override
public void start() {
System.out.println("rabbitmq producer start ... ");
}
@Override
public void stop() {
System.out.println("rabbitmq producer stop ... ");
}
@Override
public boolean send(MQMessage> message) {
System.out.println("rabbitmq producer send ... ");
return true;
}
}
kafka实现演示如下:
public class KMQProducerImpl implements MQProducer {
@Override
public void start() {
System.out.println("kafka producer start ... ");
}
@Override
public void stop() {
System.out.println("kafka producer stop ... ");
}
@Override
public boolean send(MQMessage> message) {
System.out.println("kafka producer send ... ");
return true;
}
}
主函数如下:
public class Main {
public static void main(String[] args) {
ServiceLoader mqProducers = ServiceLoader.load(MQProducer.class);
for (MQProducer mqProducer : mqProducers) {
mqProducer.start();
mqProducer.send(new MQMessage<>());
mqProducer.stop();
}
}
}
最终的目录结构如图:
最重要的一环,com.felixzh.SDKInterface.MQProducer文件内容如下:
com.felixzh.KafkaImpl.KMQProducerImpl
com.felixzh.RabbitMQImpl.RMQProducerImpl
运行结果如下: