热门标签 | HotTags
当前位置:  开发笔记 > 后端 > 正文

FlinkExactlyOnce(如何保证数据的唯一性和不重复!)

Flink-ExactlyOnce(如何保证数据的唯一性和不重复!)kafka中如何保证数据不丢失1.kafka中如何保证数据不丢失2.向kafka中sink数据packageco

Flink-Exactly Once(如何保证数据的唯一性和不重复!) kafka中如何保证数据不丢失

1. kafka中如何保证数据不丢失


2. 向kafka中sink数据

package com.wt.flink.sink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.streaming.api.scala._
object Demo5KafkaSink {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
val studentDS: DataStream[String] = env.readTextFile("data/students.json")
/**
* 将数据保存到kafka中 --- kafka sink
*
* DeliveryGuarantee.EXACTLY_ONCE: 唯一一次
* DeliveryGuarantee.AT_LEAST_ONCE: 至少一次,默认
*/
val sink: KafkaSink[String] = KafkaSink
.builder[String]()
.setBootstrapServers("master:9092,node1:9092,node2:9092") //broker地址
.setRecordSerializer(
KafkaRecordSerializationSchema
.builder[String]()
.setTopic("students_json") //topic
.setValueSerializationSchema(new SimpleStringSchema())
.build())
//.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) //唯一一次
.build()
//使用kafka sink
studentDS.sinkTo(sink)

env.execute()
//kafka-console-consumer.sh --bootstrap-server master:9092,node2:9092,node2:9092 --from-beginning --topic students_json
}
}

我们在从命令行读取学生的json数据


3 . Flink从kafka中读取数据

package com.wt.flink.core
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._
object Demo16ExactlyOnce {
def main(args: Array[String]): Unit = {
/**
* 使用flink从kafka中读取数据,怎么保证数据处理的唯一一次
*
*/
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
/**
* 开启checkpoint
*
*/
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(20000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
//RETAIN_ON_CANCELLATION: 当任务取消时保留checkpoint
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//需要设置flink checkpoint保存的位置
env.setStateBackend(new HashMapStateBackend())
//将状态保存到hdfs中
env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")
/**
* 消费kafka中的数据
*
*/
val source: KafkaSource[String] = KafkaSource.builder[String]
.setBootstrapServers("master:9092,node1:9092,node2:9092")
.setTopics("words")
.setGroupId("Demo16ExactlyOnce")
.setStartingOffsets(OffsetsInitializer.earliest) //只在第一次启动的时候生效,如果开启了checkpoint,任务重启之后会按照checkpoint中保证的偏移量消费数据
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
val kafkaSource: DataStream[String] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
val wordsDS: DataStream[String] = kafkaSource.flatMap(_.split(","))
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
countDS.print()
env.execute("Demo16ExactlyOnce")
}
}

4. 从kafka中读取数据,然后再存到kafka中



重点

DeliveryGuarantee.AT_LEAST_ONCE:至少异常,会有重复数据

DeliveryGuarantee.EXACTLY_ONCE: 唯一一次



读取数据的时候.需要指定:

--isolation-level read_committed : 只读已提交的数据

package com.wt.flink.core
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._
import java.util.Properties
object Demo17ExactlyOnce {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//开启checkpoint
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(20000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
//RETAIN_ON_CANCELLATION: 当任务取消时保留checkpoint
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
/**
* 需要设置flink checkpoint保存状态的位置
*
*/
env.setStateBackend(new HashMapStateBackend())
//将状态保存到hdfs中
env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")
val source: KafkaSource[String] = KafkaSource.builder[String]
.setBootstrapServers("master:9092,node1:9092,node2:9092")
.setTopics("source")
.setGroupId("Demo16ExactlyOnce")
.setStartingOffsets(OffsetsInitializer.earliest) //只在第一次启动的时候生效,如果开启了checkpoint,任务重启之后会按照checkpoint中保证的偏移量消费数据
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
val kafkaSource: DataStream[String] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
//过滤空数据
val filterDS: DataStream[String] = kafkaSource.filter(_.nonEmpty)
/**
* DeliveryGuarantee.AT_LEAST_ONCE:至少异常,会有重复数据
* DeliveryGuarantee.EXACTLY_ONCE: 唯一一次
*
*/
//将清洗之后的数据保存到kafka中
val properties = new Properties()
//设置事务的超时时间,要比15分钟小
properties.setProperty("transaction.timeout.ms", 10 * 60 * 1000 + "")
val kafkaSink: KafkaSink[String] = KafkaSink
.builder[String]()
.setBootstrapServers("master:9092,node1:9092,node2:9092") //broker地址
.setKafkaProducerConfig(properties) //设置额外的参数
.setRecordSerializer(
KafkaRecordSerializationSchema
.builder[String]()
.setTopic("sink") //topic
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build()

filterDS.sinkTo(kafkaSink)
/**
* 通过命令好消费sink数据
* --isolation-level read_committed : 只读已提交的数据
* kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --isolation-level read_committed --from-beginning --topic sink
*
*/
env.execute()
}
}


推荐阅读
  • Netflix利用Druid实现高效实时数据分析
    本文探讨了全球领先的在线娱乐公司Netflix如何通过采用Apache Druid,实现了高效的数据采集、处理和实时分析,从而显著提升了用户体验和业务决策的准确性。文章详细介绍了Netflix在系统架构、数据摄取、管理和查询方面的实践,并展示了Druid在大规模数据处理中的卓越性能。 ... [详细]
  • 在本周的白板演练中,Apache Flink 的 PMC 成员及数据工匠首席技术官 Stephan Ewen 深入探讨了如何利用保存点功能进行流处理中的数据重新处理、错误修复、系统升级和 A/B 测试。本文将详细解释保存点的工作原理及其应用场景。 ... [详细]
  • EasyMock实战指南
    本文介绍了如何使用EasyMock进行单元测试,特别是当测试对象的合作者依赖于外部资源或尚未实现时。通过具体的示例,展示了EasyMock在模拟对象行为方面的强大功能。 ... [详细]
  • 收割机|篇幅_国内最牛逼的笔记,不接受反驳!!
    收割机|篇幅_国内最牛逼的笔记,不接受反驳!! ... [详细]
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
  • 本文探讨了在使用Apache Flink向Kafka发送数据过程中遇到的事务频繁失败问题,并提供了详细的解决方案,包括必要的配置调整和最佳实践。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • 本文介绍了一款用于自动化部署 Linux 服务的 Bash 脚本。该脚本不仅涵盖了基本的文件复制和目录创建,还处理了系统服务的配置和启动,确保在多种 Linux 发行版上都能顺利运行。 ... [详细]
  • 本文详细介绍了如何使用Spring Boot进行高效开发,涵盖了配置、实例化容器以及核心注解的使用方法。 ... [详细]
  • Yii 实现阿里云短信发送 ... [详细]
  • 本文深入探讨了 Java 中的 Serializable 接口,解释了其实现机制、用途及注意事项,帮助开发者更好地理解和使用序列化功能。 ... [详细]
  • 本文总结了在使用Ionic 5进行Android平台APK打包时遇到的问题,特别是针对QRScanner插件的改造。通过详细分析和提供具体的解决方法,帮助开发者顺利打包并优化应用性能。 ... [详细]
  • 一家位于长沙的知名网络安全企业,现面向全国诚聘高级后端开发工程师,特别欢迎具有一线城市经验的技术精英回归故乡,共创辉煌。 ... [详细]
  • 龙蜥社区开发者访谈:技术生涯的三次蜕变 | 第3期
    龙蜥社区的开发者们通过自己的实践和经验,推动着开源技术的发展。本期「龙蜥开发者说」聚焦于一位资深开发者的三次技术转型,分享他在龙蜥社区的成长故事。 ... [详细]
author-avatar
手机用户2602901335
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有