热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

博文推荐|Flink消费Kafka实时写入ApacheDoris(KFD)

本文主要介绍通过Doris提供的StreamLoad结合Flink计算引擎怎么实现数

1.概述

Apache Doris(原百度 Palo )是一款基于大规模并行处理技术的分布式 SQL 数据仓库,由百度在 2017 年开源,2018 年 8 月进入 Apache 孵化器。

Apache Doris 是一个现代化的 MPP 分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris 的分布式架构非常简洁,易于运维,并且可以支持 10PB 以上的超大数据集。

Apache Doris 可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!

2.场景介绍

这里我们介绍的是通过 Doris 提供的 Stream Load 结合 Flink 计算引擎怎么实现数据实时快速入库操作。

使用环境如下:

  mysql 5.x/8.x (主要是业务数据库)  kafka 2.11 (消息队列)  flink 1.10.1 (流式计算引擎)  doris 0.14.7 (核心数仓)  Canal (Mysql binlog数据采集工具)

3.实现方案

这里我们采用的历史数据离线处理+增量数据实时处理的架构

3.1 历史数据离线处理

历史数据离线处理方式,这里我们使用是 Doris ODBC 外表方式,将 MySQL 的表映射到 Doris 里,然后使用

  insert into  select * from

3.1.1 外表创建方法

  1. 首先 Apache Doris 0.13.x以上版本

  2. 要在所有的 BE 节点安装对应数据的 ODBC 驱动

  3. 创建外表

具体可以参考我的另外一篇文章,这里不多做介绍。

Apache doris ODBC外表使用方式

3.2 增量数据实时处理

增量数据的实时处理,这里我们是通过 Canal 监控 MySQL Binlog 解析并推送到指定的 Kafka 队列,然后通过 Flink 去实时消费 Kafka 队列的数据,然后你可以根据自己的需要对数据进行处理,算法等,最后将明细数据或者实时计算的中间结果保存到对应的 Doris 数据表中,这里使用的是 Stream Load,你可以使用 Flink Doris Connector 。

3.2.1 Doris Sink实现

这里我们首先实现一个 Flink Doris Sink

 import com.alibaba.fastjson.JSON; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory;  import java.util.ArrayList; import java.util.Arrays; import java.util.List;  /**  * 自定义flink doris sink  */ public class DorisSink extends RichSinkFunction {      private static final Logger log = LoggerFactory.getLogger(DorisSink.class);      private final static List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));      private DorisStreamLoad dorisStreamLoad;      private String columns;      private String jsonFormat;      public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {         this.dorisStreamLoad = dorisStreamLoad;         this.columns = columns;         this.jsOnFormat= jsonFormat;    }      @Override     public void open(Configuration parameters) throws Exception {         super.open(parameters);    }       /**      * 判断StreamLoad是否成功      *      * @param respContent streamload返回的响应信息(JSON格式)      * @return      */     public static Boolean checkStreamLoadStatus(RespContent respContent) {         if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())                 && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {             return true;        } else {             return false;        }    }      @Override     public void invoke(String value, Context context) throws Exception {         DorisStreamLoad.LoadResponse loadRespOnse= dorisStreamLoad.loadBatch(value, columns, jsonFormat);         if (loadResponse != null && loadResponse.status == 200) {             RespContent respCOntent= JSON.parseObject(loadResponse.respContent, RespContent.class);             if (!checkStreamLoadStatus(respContent)) {                 log.error("Stream Load fail{}:", loadResponse);            }        } else {             log.error("Stream Load Request failed:{}", loadResponse);        }    } }

3.2.2 Stream Load 工具类

 import org.slf4j.Logger; import org.slf4j.LoggerFactory;   import java.io.Serializable; import java.io.IOException; import java.io.BufferedOutputStream; import java.io.InputStream; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Calendar; import java.util.UUID;   /**  * doris streamLoad  */  public class DorisStreamLoad implements Serializable {      private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class); //连接地址,这里使用的是连接FE     private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";     //fe ip地址     private String hostPort;     //数据库     private String db;     //要导入的数据表名     private String tbl;     //用户名     private String user;     //密码     private String passwd;     private String loadUrlStr;     private String authEncoding;       public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {         this.hostPort = hostPort;         this.db = db;         this.tbl = tbl;         this.user = user;         this.passwd = passwd;         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));    } //获取http连接信息     private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {         URL url = new URL(urlStr);         HttpURLConnection cOnn= (HttpURLConnection) url.openConnection();         conn.setInstanceFollowRedirects(false);         conn.setRequestMethod("PUT");         conn.setRequestProperty("Authorization", "Basic " + authEncoding);         conn.addRequestProperty("Expect", "100-continue");         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");         conn.addRequestProperty("label", label);         conn.addRequestProperty("max_filter_ratio", "0");         conn.addRequestProperty("strict_mode", "true");         conn.addRequestProperty("columns", columns);         conn.addRequestProperty("format", "json");         conn.addRequestProperty("jsonpaths", jsonformat);         conn.addRequestProperty("strip_outer_array", "true");         conn.setDoOutput(true);         conn.setDoInput(true);          return conn;    }      public static class LoadResponse {         public int status;         public String respMsg;         public String respContent;          public LoadResponse(int status, String respMsg, String respContent) {             this.status = status;             this.respMsg = respMsg;             this.respCOntent= respContent;        }          @Override         public String toString() {             StringBuilder sb = new StringBuilder();             sb.append("status: ").append(status);             sb.append(", resp msg: ").append(respMsg);             sb.append(", resp content: ").append(respContent);             return sb.toString();        }    } //执行数据导入     public LoadResponse loadBatch(String data, String columns, String jsonformat) {         Calendar calendar = Calendar.getInstance();         //导入的lable,全局唯一         String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s",                 calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),                 calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),                 UUID.randomUUID().toString().replaceAll("-", ""));          HttpURLConnection feCOnn= null;         HttpURLConnection beCOnn= null;         try {             // build request and send to fe             feCOnn= getConnection(loadUrlStr, label, columns, jsonformat);             int status = feConn.getResponseCode();             // fe send back http response code TEMPORARY_REDIRECT 307 and new be location             if (status != 307) {                 throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);            }             String location = feConn.getHeaderField("Location");             if (location == null) {                 throw new Exception("redirect location is null");            }             // build request and send to new be location             beCOnn= getConnection(location, label, columns, jsonformat);             // send data to be             BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());             bos.write(data.getBytes());             bos.close();              // get respond             status = beConn.getResponseCode();             String respMsg = beConn.getResponseMessage();             InputStream stream = (InputStream) beConn.getContent();             BufferedReader br = new BufferedReader(new InputStreamReader(stream));             StringBuilder respOnse= new StringBuilder();             String line;             while ((line = br.readLine()) != null) {                 response.append(line);            }             return new LoadResponse(status, respMsg, response.toString());         } catch (Exception e) {             e.printStackTrace();             String err = "failed to load audit via AuditLoader plugin with label: " + label;             log.warn(err, e);             return new LoadResponse(-1, e.getMessage(), err);        } finally {             if (feConn != null) {                 feConn.disconnect();            }             if (beConn != null) {                 beConn.disconnect();            }        }    }  }

3.2.3 Flink Job

这个地方演示的是单表,如果是你通过 Canal 监听的多个表的数据,这里你需要根据表名进行区分,并和你 MySQL 表和 Doris 里的表建好对应关系,解析相应的数据即可

 import org.apache.doris.demo.flink.DorisSink; import org.apache.doris.demo.flink.DorisStreamLoad; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;  import java.util.Properties;  /**  *  * This example mainly demonstrates how to use flink to stream Kafka data.  * And use the doris streamLoad method to write the data into the table specified by doris  *

  * Kafka data format is an array, For example: ["id":1,"name":"root"]  */  public class FlinkKafka2Doris {     //kafka address     private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";     //kafka groupName     private static final String groupName = "test_flink_doris_group";     //kafka topicName     private static final String topicName = "test_flink_doris";     //doris ip port     private static final String hostPort = "xxx:8030";     //doris dbName     private static final String dbName = "db1";     //doris tbName     private static final String tbName = "tb1";     //doris userName     private static final String userName = "root";     //doris password     private static final String password = "";     //doris columns     private static final String columns = "name,age,price,sale";     //json format     private static final String jsOnFormat= "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";      public static void main(String[] args) throws Exception {          Properties props = new Properties();         props.put("bootstrap.servers", bootstrapServer);         props.put("group.id", groupName);         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         props.put("auto.offset.reset", "earliest");         props.put("max.poll.records", "10000");          StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();         blinkStreamEnv.enableCheckpointing(10000);         blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);          FlinkKafkaConsumer flinkKafkaCOnsumer= new FlinkKafkaConsumer<>(topicName,                 new SimpleStringSchema(),                 props);          DataStreamSource dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);          DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);          dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));          blinkStreamEnv.execute("flink kafka to doris");     } }

然后将 Flink Job 提交到集群上就可以运行了,数据就可以试试入库

这里其实是一个微批处理,你可以自己完善以下几部分:

  1. 每个批次最大入库记录数,或者每个多少秒进行一次入库,如果你的实时数据量比较小,或者你的数据比较大,这两条件哪个先到执行哪个

  2. 这里连接是 FE,你可以通过 FE 的 rest api 接口拿到所有的 BE 节点,直接连接 BE 进行入库,URL 地址只是将 FE 的 IP 和端口换成 BE 的 IP 及 HTTP 端口即可

  3. 为了避免你连接这个 BE 或者 FE 的时候,正好这个节点挂了,你可以进行重试其他 FE 或者 BE

  4. 为了避免单个节点压力,你可以进行轮询 BE 节点,不要每次都连接同一个 BE 节点

  5. 设置最大重试次数,如果超过这个次数,可以将导入失败的数据推送到 Kafka 队列,以方便后续人工手动处理

4.总结

本文只是抛砖引玉的方式给大家一个使用 Stream Load 进行数据接入的使用方式及示例,Doris还有很多数据接入的方式等待大家去探索。



推荐阅读
  • 语法:CREATE[索引类型]INDEX索引名称ON表名(列名)WITHFILLFACTOR填充因子值0~100GOUSE库名GOIFEXISTS(SELECT*FR ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • MySQL数据库锁机制及其应用(数据库锁的概念)
    本文介绍了MySQL数据库锁机制及其应用。数据库锁是计算机协调多个进程或线程并发访问某一资源的机制,在数据库中,数据是一种供许多用户共享的资源,如何保证数据并发访问的一致性和有效性是数据库必须解决的问题。MySQL的锁机制相对简单,不同的存储引擎支持不同的锁机制,主要包括表级锁、行级锁和页面锁。本文详细介绍了MySQL表级锁的锁模式和特点,以及行级锁和页面锁的特点和应用场景。同时还讨论了锁冲突对数据库并发访问性能的影响。 ... [详细]
  • 云原生应用最佳开发实践之十二原则(12factor)
    目录简介一、基准代码二、依赖三、配置四、后端配置五、构建、发布、运行六、进程七、端口绑定八、并发九、易处理十、开发与线上环境等价十一、日志十二、进程管理当 ... [详细]
  • 提供:ZStack云计算原创2016-12-26张鑫讲师介绍张鑫ZStack总架构师、联合创始人《系统虚拟化》主要作者,曾任职Intel开源软件技术中心 ... [详细]
  • Java工程师书单(初级,中级,高级)
    简介怎样学习才能从一名Java初级程序员成长为一名合格的架构师,或者说一名合格的架构师应该有怎样的技术知识体系,这是不仅一个刚刚踏入职场的初级程序员也是工作一两年之后开始迷茫的程序 ... [详细]
  • VScode格式化文档换行或不换行的设置方法
    本文介绍了在VScode中设置格式化文档换行或不换行的方法,包括使用插件和修改settings.json文件的内容。详细步骤为:找到settings.json文件,将其中的代码替换为指定的代码。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 本文由编程笔记小编整理,介绍了PHP中的MySQL函数库及其常用函数,包括mysql_connect、mysql_error、mysql_select_db、mysql_query、mysql_affected_row、mysql_close等。希望对读者有一定的参考价值。 ... [详细]
  • Java学习笔记之使用反射+泛型构建通用DAO
    本文介绍了使用反射和泛型构建通用DAO的方法,通过减少代码冗余度来提高开发效率。通过示例说明了如何使用反射和泛型来实现对不同表的相同操作,从而避免重复编写相似的代码。该方法可以在Java学习中起到较大的帮助作用。 ... [详细]
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
  • STL迭代器的种类及其功能介绍
    本文介绍了标准模板库(STL)定义的五种迭代器的种类和功能。通过图表展示了这几种迭代器之间的关系,并详细描述了各个迭代器的功能和使用方法。其中,输入迭代器用于从容器中读取元素,输出迭代器用于向容器中写入元素,正向迭代器是输入迭代器和输出迭代器的组合。本文的目的是帮助读者更好地理解STL迭代器的使用方法和特点。 ... [详细]
  • 2021最新总结网易/腾讯/CVTE/字节面经分享(附答案解析)
    本文分享作者在2021年面试网易、腾讯、CVTE和字节等大型互联网企业的经历和问题,包括稳定性设计、数据库优化、分布式锁的设计等内容。同时提供了大厂最新面试真题笔记,并附带答案解析。 ... [详细]
  • Flink(三)IDEA开发Flink环境搭建与测试
    一.IDEA开发环境1.pom文件设置1.8 ... [详细]
author-avatar
陈翔_是学长
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有