热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SPI(ServiceProviderInterface):Flink动态source&sink全靠它

工欲善其事,必先利其器。阅读源码,

SPI概述

      SPI即Service Provider Interface,这玩意是JDK内置的一套服务发现机制,该接口允许第三方实现或者扩展,这些被扩展的接口常常用于寻找服务实现。从这方面来讲,有点类似于IOC功能,装配控制权不在程序只动态指明,这种设计达到的最重要的功能就是解耦,尤其适合于模块化设计领域中。所以,在实际的项目开发过程中,通常都将需求抽象成接口,然后对应不同的需求,以不同的方式实现相同的接口。对接口的一种新的实现,不用修改原先的项目代码,只需要写新的实现,打成jar包。SPI就是通过在ClassPath路径下的META-INF/services文件夹查找文件,自动加载文件里所定义的类。当然,很多开源的优秀的项目中,都有SPI的身影,比如Dubbo、Flink等。

       之所以写这篇文章,就是因为本人在阅读Flink1.11.1源码时发现,新版本里面的source和sink,改用了SPI机制。详细的设计动机和方案见FLIP-95: New TableSource and TableSink interfaces

https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

案例演示

代码详见我的github:https://github.com/felixzh2020/felixzh-learning-java/tree/master/JDKSpi

简单描述:定义一套消息队列的接口,分别使用kafka和rabbitMQ实现这套接口。

消费接口如下:

    public interface MQConsumer {
    public void start();


    public void stop();


    public void subscribe(String topic, MQBusinessHandler topicHandler);


    public void unsubscribe(String topic);
    }

    生产接口如下:

      public interface MQProducer {
      public void start();
      public void stop();
      public boolean send(MQMessage message);
      }

      业务处理逻辑接口如下:

        public interface MQBusinessHandler {
        public boolean doBusiness(MQMessage message);
        }

        消息体类如下:

          public class MQMessage<Timplements Serializable {
          private static final long serialVersiOnUID= 4359709211352400087L;
          private String id;
          private String timeStamp;
          private String topic;
          private String tag;
          private Map headers;
          private T content;


          public static long getSerialVersionUID() {
          return serialVersionUID;
          }


          public String getId() {
          return id;
          }


          public void setId(String id) {
          this.id = id;
          }


          public String getTimeStamp() {
          return timeStamp;
          }


          public void setTimeStamp(String timeStamp) {
          this.timeStamp = timeStamp;
          }


          public String getTopic() {
          return topic;
          }


          public void setTopic(String topic) {
          this.topic = topic;
          }


          public String getTag() {
          return tag;
          }


          public void setTag(String tag) {
          this.tag = tag;
          }


          public Map getHeaders() {
          return headers;
          }


          public void setHeaders(Map headers) {
          this.headers = headers;
          }


          public T getContent() {
          return content;
          }


          public void setContent(T content) {
          this.cOntent= content;
          }
          }


          RabbitMQ实现演示如下:

            public class RMQProducerImpl implements MQProducer {
            @Override
            public void start() {
            System.out.println("rabbitmq producer start ... ");
            }


            @Override
            public void stop() {
            System.out.println("rabbitmq producer stop ... ");
            }


            @Override
            public boolean send(MQMessage message) {
            System.out.println("rabbitmq producer send ... ");
            return true;
            }
            }

            kafka实现演示如下:

              public class KMQProducerImpl implements MQProducer {
              @Override
              public void start() {
              System.out.println("kafka producer start ... ");
              }


              @Override
              public void stop() {
              System.out.println("kafka producer stop ... ");
              }


              @Override
              public boolean send(MQMessage message) {
              System.out.println("kafka producer send ... ");
              return true;
              }
              }

              主函数如下:

                public class Main {
                public static void main(String[] args) {
                ServiceLoader mqProducers = ServiceLoader.load(MQProducer.class);
                for (MQProducer mqProducer : mqProducers) {
                mqProducer.start();
                mqProducer.send(new MQMessage<>());
                mqProducer.stop();
                }
                }
                }

                最终的目录结构如图:

                最重要的一环,com.felixzh.SDKInterface.MQProducer文件内容如下:

                  com.felixzh.KafkaImpl.KMQProducerImpl
                  com.felixzh.RabbitMQImpl.RMQProducerImpl

                  运行结果如下:


                  flink源码里面SPI的案例如下图所示




                  推荐阅读
                  author-avatar
                  frank52_445
                  这个家伙很懒,什么也没留下!
                  PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
                  Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有