热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

博文推荐|Flink消费Kafka实时写入ApacheDoris(KFD)

本文主要介绍通过Doris提供的StreamLoad结合Flink计算引擎怎么实现数

1.概述

Apache Doris(原百度 Palo )是一款基于大规模并行处理技术的分布式 SQL 数据仓库,由百度在 2017 年开源,2018 年 8 月进入 Apache 孵化器。

Apache Doris 是一个现代化的 MPP 分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris 的分布式架构非常简洁,易于运维,并且可以支持 10PB 以上的超大数据集。

Apache Doris 可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!

2.场景介绍

这里我们介绍的是通过 Doris 提供的 Stream Load 结合 Flink 计算引擎怎么实现数据实时快速入库操作。

使用环境如下:

  mysql 5.x/8.x (主要是业务数据库)  kafka 2.11 (消息队列)  flink 1.10.1 (流式计算引擎)  doris 0.14.7 (核心数仓)  Canal (Mysql binlog数据采集工具)

3.实现方案

这里我们采用的历史数据离线处理+增量数据实时处理的架构

3.1 历史数据离线处理

历史数据离线处理方式,这里我们使用是 Doris ODBC 外表方式,将 MySQL 的表映射到 Doris 里,然后使用

  insert into  select * from

3.1.1 外表创建方法

  1. 首先 Apache Doris 0.13.x以上版本

  2. 要在所有的 BE 节点安装对应数据的 ODBC 驱动

  3. 创建外表

具体可以参考我的另外一篇文章,这里不多做介绍。

Apache doris ODBC外表使用方式

3.2 增量数据实时处理

增量数据的实时处理,这里我们是通过 Canal 监控 MySQL Binlog 解析并推送到指定的 Kafka 队列,然后通过 Flink 去实时消费 Kafka 队列的数据,然后你可以根据自己的需要对数据进行处理,算法等,最后将明细数据或者实时计算的中间结果保存到对应的 Doris 数据表中,这里使用的是 Stream Load,你可以使用 Flink Doris Connector 。

3.2.1 Doris Sink实现

这里我们首先实现一个 Flink Doris Sink

 import com.alibaba.fastjson.JSON; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory;  import java.util.ArrayList; import java.util.Arrays; import java.util.List;  /**  * 自定义flink doris sink  */ public class DorisSink extends RichSinkFunction {      private static final Logger log = LoggerFactory.getLogger(DorisSink.class);      private final static List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));      private DorisStreamLoad dorisStreamLoad;      private String columns;      private String jsonFormat;      public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {         this.dorisStreamLoad = dorisStreamLoad;         this.columns = columns;         this.jsOnFormat= jsonFormat;    }      @Override     public void open(Configuration parameters) throws Exception {         super.open(parameters);    }       /**      * 判断StreamLoad是否成功      *      * @param respContent streamload返回的响应信息(JSON格式)      * @return      */     public static Boolean checkStreamLoadStatus(RespContent respContent) {         if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())                 && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {             return true;        } else {             return false;        }    }      @Override     public void invoke(String value, Context context) throws Exception {         DorisStreamLoad.LoadResponse loadRespOnse= dorisStreamLoad.loadBatch(value, columns, jsonFormat);         if (loadResponse != null && loadResponse.status == 200) {             RespContent respCOntent= JSON.parseObject(loadResponse.respContent, RespContent.class);             if (!checkStreamLoadStatus(respContent)) {                 log.error("Stream Load fail{}:", loadResponse);            }        } else {             log.error("Stream Load Request failed:{}", loadResponse);        }    } }

3.2.2 Stream Load 工具类

 import org.slf4j.Logger; import org.slf4j.LoggerFactory;   import java.io.Serializable; import java.io.IOException; import java.io.BufferedOutputStream; import java.io.InputStream; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Calendar; import java.util.UUID;   /**  * doris streamLoad  */  public class DorisStreamLoad implements Serializable {      private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class); //连接地址,这里使用的是连接FE     private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";     //fe ip地址     private String hostPort;     //数据库     private String db;     //要导入的数据表名     private String tbl;     //用户名     private String user;     //密码     private String passwd;     private String loadUrlStr;     private String authEncoding;       public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {         this.hostPort = hostPort;         this.db = db;         this.tbl = tbl;         this.user = user;         this.passwd = passwd;         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));    } //获取http连接信息     private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {         URL url = new URL(urlStr);         HttpURLConnection cOnn= (HttpURLConnection) url.openConnection();         conn.setInstanceFollowRedirects(false);         conn.setRequestMethod("PUT");         conn.setRequestProperty("Authorization", "Basic " + authEncoding);         conn.addRequestProperty("Expect", "100-continue");         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");         conn.addRequestProperty("label", label);         conn.addRequestProperty("max_filter_ratio", "0");         conn.addRequestProperty("strict_mode", "true");         conn.addRequestProperty("columns", columns);         conn.addRequestProperty("format", "json");         conn.addRequestProperty("jsonpaths", jsonformat);         conn.addRequestProperty("strip_outer_array", "true");         conn.setDoOutput(true);         conn.setDoInput(true);          return conn;    }      public static class LoadResponse {         public int status;         public String respMsg;         public String respContent;          public LoadResponse(int status, String respMsg, String respContent) {             this.status = status;             this.respMsg = respMsg;             this.respCOntent= respContent;        }          @Override         public String toString() {             StringBuilder sb = new StringBuilder();             sb.append("status: ").append(status);             sb.append(", resp msg: ").append(respMsg);             sb.append(", resp content: ").append(respContent);             return sb.toString();        }    } //执行数据导入     public LoadResponse loadBatch(String data, String columns, String jsonformat) {         Calendar calendar = Calendar.getInstance();         //导入的lable,全局唯一         String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s",                 calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),                 calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),                 UUID.randomUUID().toString().replaceAll("-", ""));          HttpURLConnection feCOnn= null;         HttpURLConnection beCOnn= null;         try {             // build request and send to fe             feCOnn= getConnection(loadUrlStr, label, columns, jsonformat);             int status = feConn.getResponseCode();             // fe send back http response code TEMPORARY_REDIRECT 307 and new be location             if (status != 307) {                 throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);            }             String location = feConn.getHeaderField("Location");             if (location == null) {                 throw new Exception("redirect location is null");            }             // build request and send to new be location             beCOnn= getConnection(location, label, columns, jsonformat);             // send data to be             BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());             bos.write(data.getBytes());             bos.close();              // get respond             status = beConn.getResponseCode();             String respMsg = beConn.getResponseMessage();             InputStream stream = (InputStream) beConn.getContent();             BufferedReader br = new BufferedReader(new InputStreamReader(stream));             StringBuilder respOnse= new StringBuilder();             String line;             while ((line = br.readLine()) != null) {                 response.append(line);            }             return new LoadResponse(status, respMsg, response.toString());         } catch (Exception e) {             e.printStackTrace();             String err = "failed to load audit via AuditLoader plugin with label: " + label;             log.warn(err, e);             return new LoadResponse(-1, e.getMessage(), err);        } finally {             if (feConn != null) {                 feConn.disconnect();            }             if (beConn != null) {                 beConn.disconnect();            }        }    }  }

3.2.3 Flink Job

这个地方演示的是单表,如果是你通过 Canal 监听的多个表的数据,这里你需要根据表名进行区分,并和你 MySQL 表和 Doris 里的表建好对应关系,解析相应的数据即可

 import org.apache.doris.demo.flink.DorisSink; import org.apache.doris.demo.flink.DorisStreamLoad; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;  import java.util.Properties;  /**  *  * This example mainly demonstrates how to use flink to stream Kafka data.  * And use the doris streamLoad method to write the data into the table specified by doris  *

  * Kafka data format is an array, For example: ["id":1,"name":"root"]  */  public class FlinkKafka2Doris {     //kafka address     private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";     //kafka groupName     private static final String groupName = "test_flink_doris_group";     //kafka topicName     private static final String topicName = "test_flink_doris";     //doris ip port     private static final String hostPort = "xxx:8030";     //doris dbName     private static final String dbName = "db1";     //doris tbName     private static final String tbName = "tb1";     //doris userName     private static final String userName = "root";     //doris password     private static final String password = "";     //doris columns     private static final String columns = "name,age,price,sale";     //json format     private static final String jsOnFormat= "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";      public static void main(String[] args) throws Exception {          Properties props = new Properties();         props.put("bootstrap.servers", bootstrapServer);         props.put("group.id", groupName);         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         props.put("auto.offset.reset", "earliest");         props.put("max.poll.records", "10000");          StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();         blinkStreamEnv.enableCheckpointing(10000);         blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);          FlinkKafkaConsumer flinkKafkaCOnsumer= new FlinkKafkaConsumer<>(topicName,                 new SimpleStringSchema(),                 props);          DataStreamSource dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);          DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);          dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));          blinkStreamEnv.execute("flink kafka to doris");     } }

然后将 Flink Job 提交到集群上就可以运行了,数据就可以试试入库

这里其实是一个微批处理,你可以自己完善以下几部分:

  1. 每个批次最大入库记录数,或者每个多少秒进行一次入库,如果你的实时数据量比较小,或者你的数据比较大,这两条件哪个先到执行哪个

  2. 这里连接是 FE,你可以通过 FE 的 rest api 接口拿到所有的 BE 节点,直接连接 BE 进行入库,URL 地址只是将 FE 的 IP 和端口换成 BE 的 IP 及 HTTP 端口即可

  3. 为了避免你连接这个 BE 或者 FE 的时候,正好这个节点挂了,你可以进行重试其他 FE 或者 BE

  4. 为了避免单个节点压力,你可以进行轮询 BE 节点,不要每次都连接同一个 BE 节点

  5. 设置最大重试次数,如果超过这个次数,可以将导入失败的数据推送到 Kafka 队列,以方便后续人工手动处理

4.总结

本文只是抛砖引玉的方式给大家一个使用 Stream Load 进行数据接入的使用方式及示例,Doris还有很多数据接入的方式等待大家去探索。



推荐阅读
  • Python 数据可视化实战指南
    本文详细介绍如何使用 Python 进行数据可视化,涵盖从环境搭建到具体实例的全过程。 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 如何在Linux服务器上配置MySQL和Tomcat的开机自动启动
    在Linux服务器上部署Web项目时,通常需要确保MySQL和Tomcat服务能够随系统启动而自动运行。本文将详细介绍如何在Linux环境中配置MySQL和Tomcat的开机自启动,以确保服务的稳定性和可靠性。通过合理的配置,可以有效避免因服务未启动而导致的项目故障。 ... [详细]
  • MySQL的查询执行流程涉及多个关键组件,包括连接器、查询缓存、分析器和优化器。在服务层,连接器负责建立与客户端的连接,查询缓存用于存储和检索常用查询结果,以提高性能。分析器则解析SQL语句,生成语法树,而优化器负责选择最优的查询执行计划。这一流程确保了MySQL能够高效地处理各种复杂的查询请求。 ... [详细]
  • 数字经济浪潮下企业人才需求变化,优质IT培训机构助力技能提升
    随着云计算、大数据、人工智能、区块链和5G等技术的迅猛发展,数字经济已成为推动经济增长的重要动力。据信通院数据,2020年中国数字经济占GDP比重达38.6%,整体规模突破39.2万亿元。本文探讨了企业在数字化转型中对技术人才的需求变化,并介绍了优质IT培训机构如何助力人才培养。 ... [详细]
  • 2020年9月15日,Oracle正式发布了最新的JDK 15版本。本次更新带来了许多新特性,包括隐藏类、EdDSA签名算法、模式匹配、记录类、封闭类和文本块等。 ... [详细]
  • 包含phppdoerrorcode的词条 ... [详细]
  • Spring Data JdbcTemplate 入门指南
    本文将介绍如何使用 Spring JdbcTemplate 进行数据库操作,包括查询和插入数据。我们将通过一个学生表的示例来演示具体步骤。 ... [详细]
  • 本文详细介绍了如何使用 Python 进行主成分分析(PCA),包括数据导入、预处理、模型训练和结果可视化等步骤。通过具体的代码示例,帮助读者理解和应用 PCA 技术。 ... [详细]
  • Ihavetwomethodsofgeneratingmdistinctrandomnumbersintherange[0..n-1]我有两种方法在范围[0.n-1]中生 ... [详细]
  • 网站访问全流程解析
    本文详细介绍了从用户在浏览器中输入一个域名(如www.yy.com)到页面完全展示的整个过程,包括DNS解析、TCP连接、请求响应等多个步骤。 ... [详细]
  • importpymysql#一、直接连接mysql数据库'''coonpymysql.connect(host'192.168.*.*',u ... [详细]
  • 本文详细介绍了数据库并发控制的基本概念、重要性和具体实现方法。并发控制是确保多个事务在同时操作数据库时保持数据一致性的关键机制。文章涵盖了锁机制、多版本并发控制(MVCC)、乐观并发控制和悲观并发控制等内容。 ... [详细]
  • 优化后的标题:Apache Cassandra数据写入操作详解
    本文详细解析了 Apache Cassandra 中的数据写入操作,重点介绍了 INSERT 命令的使用方法。该命令主要用于将数据插入到指定表的列中,其基本语法为 `INSERT INTO 表名 (列1, 列2, ...) VALUES (值1, 值2, ...)`。通过具体的示例和应用场景,文章深入探讨了如何高效地执行数据写入操作,以提升系统的性能和可靠性。 ... [详细]
author-avatar
陈翔_是学长
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有