热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用Kafka和Flink构建实时数据处理系统

引言在很多领域,如股市走向分析,气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大ÿ

引言

在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要。流计算的出现,就是为了更好地解决这类数据在处理过程中遇到的问题。与传统架构不同,流计算模型在数据流动的过程中实时地进行捕捉和处理,并根据业务需求对数据进行计算分析,最终把结果保存或者分发给需要的组件。本文将从实时数据产生和流向的各个环节出发,通过一个具有实际意义的案例,向读者介绍如何使用 Apache Kafka 和 Apache Flink 构建一个实时的数据处理系统,当然本文只是抛砖引玉,因为构建一个良好健壮的实时数据处理系统并不是一篇文章可以说清楚的。在阅读本文前,假设您已经对 Apache Kafka 分布式消息系统有了基本的了解,并且可以使用 Flink SQL 编写业务逻辑。接下来,就让我们一起看看如何构建一个简易的实时数据处理系统吧。

关于Kafka

Kafka 是一个分布式的,高吞吐量,易于扩展地基于主题发布/订阅的消息系统,最早是由 Linkedin 开发,并于 2011 年开源并贡献给 Apache 软件基金会。一般来说,Kafka 有以下几个典型的应用场景:
  • 作为消息队列。由于 Kafka 拥有高吞吐量,并且内置消息主题分区,备份,容错等特性,使得它更适合使用在大规模,高强度的消息数据处理的系统中。
  • 流计算系统的数据源。流数据产生系统作为 Kafka 消息数据的生产者将数据流分发给 Kafka 消息主题,流数据计算系统 (Flink,Storm,Spark Streaming等) 实时消费并计算数据。这也是本文将要介绍的应用场景。
  • 系统用户行为数据源。这种场景下,系统将用户的行为数据,如访问页面,停留时间,搜索日志,感兴趣的话题等数据实时或者周期性的发布到 Kafka 消息主题,作为对接系统数据的来源。
  • 日志聚集。Kafka 可以作为一个日志收集系统的替代解决方案,我们可以将系统日志数据按类别汇集到不同的 Kafka 消息主题中。
  • 事件源。在基于事件驱动的系统中,我们可以将事件设计成合理的格式,作为 Kafka 消息数据存储起来,以便相应系统模块做实时或者定期处理。由于 Kafka 支持大数据量存储,并且有备份和容错机制,所以可以让事件驱动型系统更加健壮和高效。
当然 Kafka 还可以支持其他的应用场景,在这里我们就不一一罗列了。关于 Kafka 更详细的介绍,请读者参考Kafka 官网。需要指出的是,本文使用的 Kafka 版本是基于 Scala 2.10 版本构建的 0.8.2.1 版本。

关于Flink

Flink支持多种数据源:Kafka、MQ、SLS、Datahub 等,原生支持写入到 MQ、OTS、常见关系数据库等存储介质,提供了不同的抽象级别以开发流式或批处理应用。
1534815616178-0df91477-37ea-4f20-baa1-a2
  • 最底层级的抽象仅仅提供了有状态流。它将通过过程函数(Process Function)嵌入到DataStream API中。它允许用户可以自由地处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而使程序可以实现复杂的计算。
  • 实际上,大多数应用并不需要上述的低层级抽象,而是针对 核心API(Core APIs) 进行编程,比如DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些流畅的API为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows),状态(state)等等。这些API处理的数据类型以类(classes)的形式由各自的编程语言所表示。
低层级的 过程函数 与 DataStream API 相集成,使其可以对某些特定的操作进行低层级的抽象。DataSet API 为有界数据集提供了额外的原语,例如循环与迭代。
  • Table API 是以  为中心的声明式DSL,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表具有附加的模式(类似于关系数据库中的表),同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。Table API程序声明式地定义了 什么逻辑操作应该执行 而不是准确地确定 这些操作代码的看上去如何 。 尽管Table API可以通过多种类型的用户定义的函数进行扩展,其仍不如 核心API 更具表达能力,但是使用起来却更加简洁(代码量更少)。除此之外,Table API程序还可以在执行之前通过应用优化规则的优化器。
你可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
  • Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。

案例介绍与Flink SQL编程实现

1.案例介绍

该案例中,我们假设某论坛需要根据用户对站内网页的点击量,停留时间,以及是否点赞,来近实时的计算网页热度,进而动态的更新网站的今日热点模块,把最热话题的链接显示其中。

2.案例分析

对于某一个访问论坛的用户,我们需要对他的行为数据做一个抽象,以便于解释网页话题热度的计算过程。
首先,我们通过一个向量来定义用户对于某个网页的行为即点击的网页,开始时间,停留时间,以及是否点赞,可以表示如下:
(page001.html,0, 1, 0.5, 1)
向量的第一项表示网页的 ID,第二项表示用户网页开始点击时间,第三项表示从进入网站到离开对该网页的点击次数,第四项表示停留时间,以秒为单位,第五项是代表是否点赞,1 为赞,-1 表示踩,0 表示中立。
其次,我们再按照各个行为对计算网页话题热度的贡献,给其设定一个权重,在本文中,我们假设点击次数权重是 0.8,因为用户可能是由于没有其他更好的话题,所以再次浏览这个话题。停留时间权重是 0.8,因为用户可能同时打开多个 tab 页,但他真正关注的只是其中一个话题。是否点赞权重是 1,因为这一般表示用户对该网页的话题很有兴趣。
最后,我们定义用下列公式计算某条行为数据对于该网页热度的贡献值。
f(x,y,z)=0.8x+0.8y+z
那么对于上面的行为数据 (page001.html, 1, 0.5, 1),利用公式可得:
H(page001)=f(x,y,z)= 0.8x+0.8y+z=0.8*1+0.8*0.5+1*1=2.2
读者可以留意到,在这个过程中,我们忽略了用户本身,也就是说我们不关注用户是谁,而只关注它对于网页热度所做的贡献。

3.生产行为数据信息

在本案例中我们将使用一段程序来模拟用户行为,该程序每隔 5 秒钟会随机的向 user-behavior-topic 主题推送 0 到 50 条行为数据消息,显然,这个程序扮演消息生产者的角色,在实际应用中,这个功能一般会由一个系统来提供。为了简化消息处理,我们定义消息的格式如下:
网页 ID|开始点击时间|点击次数|停留时间 (分钟)|是否点赞
ID(varchar)|firsttime(timestamp)|count(bigint)|timegap(bigint)|positive(boolean)

4.编写Flink SQL编写程序实时处理数据

在弄清楚了要解决的问题之后,就可以开始编码实现了。对于本案例中的问题,在实现上的基本步骤如下:
  • 创建源表和结果表
  • 创建子查询,使用 ROW_NUMBER() 窗口函数来对数据根据排序列进行排序并标上排名
  • 外层查询中,对排名进行过滤,只取前N条,如N=5,那么就是取 Top 5 的数据,即网页热度排名


-- Kafka源表
create table input(
id varchar,
count bigint,
timegap bigint,
positive boolean,
firsttime timestamp,
WATERMARK wk FOR firsttime as withOffset(firsttime, 2000)
)with(
type='kafka'
endpoint='xxx'
......
);
-- Hbase存储表
create table output(
window_start TIMESTAMP,
window_end TIMESTAMP,
id bigint,
contribution double
)with(
type='ALIHBASE'
endpoint='xxx'
......
);
-- 5秒钟窗口贡献度统计
CREATE VIEW group_view AS
SELECT id,TUMBLE_START(firsttime, INTERVAL '5' SECOND) AS start_time,(0.8*count+0.8*timegap+positive) as contribution
FROM input
GROUP BY id, TUMBLE(firsttime, INTERVAL '1' MINUTE);-- 统计每5秒 top5 贡献值ID,并输出
insert into output
select
(SELECTid, firsttime, contribution,ROW_NUMBER() OVER (PARTITION BY firsttime ORDER BY contribution DESC) as rownum,FROMgroup_view
)
WHERE rownum <&#61; 5;


注意事项

利用 Flink 构建一个高效健壮的流数据计算系统&#xff0c;我们还需要注意以下方面。
  • 需要合理的设置时间窗口&#xff0c;即需要保证Flink的计算窗口合理地统计到热度最高的话题&#xff0c;理论上Flink的窗口可以是无界的&#xff0c;也可以是很小的时间窗口&#xff0c;但是合理的窗口大小设计对业务逻辑的影响。
  • 虽然本文案例中&#xff0c;我们只是把 (近) 实时计算结果打印出来&#xff0c;但是实际上很多时候这些结果会被保存到数据库,HDFS, 或者发送回 Kafka, 以供其他系统利用这些数据做进一步的业务处理&#xff0c;Flink可以直接实现这些功能。
  • 由于流计算对实时性要求很高&#xff0c;所以任何由于 JVM Full GC 引起的系统暂停都是不可接受的。Flink 采用类似 DBMS 的 sort 和 join 算法&#xff0c;直接操作二进制数据&#xff0c;从而使序列化/反序列化带来的开销达到最小。所以 Flink 的内部实现更像 C/C&#43;&#43; 而非 Java。如果需要处理的数据超出了内存限制&#xff0c;则会将部分数据存储到硬盘上。如果要操作多块MemorySegment就像操作一块大的连续内存一样&#xff0c;Flink会使用逻辑视图&#xff08;AbstractPagedInputView&#xff09;来方便操作。
  • Flink内部支持exactly-once&#xff0c;要想达到端到端&#xff08;Soruce到Sink&#xff09;的exactly-once&#xff0c;需要Blink外部Soruce和Sink的支持&#xff0c;比如Source要支持精准的offset&#xff0c;Sink要支持两阶段提交&#xff0c;也就是继承TwoPhaseCommitSinkFunction。
  • Flink中当所有输入的barrier没有完全到来的时候&#xff0c;早到来的event在exactly-once的情况向会进行缓存&#xff08;不进行处理&#xff09;&#xff0c;而at-least-once的模式下即使所有输入的barrier没有完全到来的时候&#xff0c;早到来的event也会进行处理。也就是说对于at-least-once模式下&#xff0c;对于下游节点而言&#xff0c;本来数据属于checkpoint n的数据在checkpoint n-1里面也可能处理过了。所以我们建议&#xff0c;Flink的checkpoint模式设置为exactl-once模式。

结束语

本文包含了集成Flink和 Kafka 分布式消息系统的基本知识&#xff0c;但是需要指出的是&#xff0c;在实际问题中&#xff0c;我们可能面临更多的问题&#xff0c;如性能优化&#xff0c;内存不足&#xff0c;以及其他未曾遇到的问题。希望通过本文的阅读&#xff0c;读者能对使用 Flink SQL 和 Kafka 构建实时数据处理系统有一个基本的认识&#xff0c;为读者进行更深入的研究提供一个参考依据。读者在阅读本文的时候发现任何问题或者有任何建议&#xff0c;请不吝赐教&#xff0c;留下您的评论&#xff0c;我会及时回复。希望我们可以一起讨论&#xff0c;共同进步。




推荐阅读
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 以Flink为例,消除流处理常见的六大谬见
    以Flink为例,消除流处理常见的六大谬见 ... [详细]
  • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
    报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • ZeroMQ在云计算环境下的高效消息传递库第四章学习心得
    本章节深入探讨了ZeroMQ在云计算环境中的高效消息传递机制,涵盖客户端请求-响应模式、最近最少使用(LRU)队列、心跳检测、面向服务的队列、基于磁盘的离线队列以及主从备份服务等关键技术。此外,还介绍了无中间件的请求-响应架构,强调了这些技术在提升系统性能和可靠性方面的应用价值。个人理解方面,ZeroMQ通过这些机制有效解决了分布式系统中常见的通信延迟和数据一致性问题。 ... [详细]
  • Flink(三)IDEA开发Flink环境搭建与测试
    一.IDEA开发环境1.pom文件设置1.8 ... [详细]
  • 在多年使用Java 8进行新应用开发和现有应用迁移的过程中,我总结了一些非常实用的技术技巧。虽然我不赞同“最佳实践”这一术语,因为它可能暗示了通用的解决方案,但这些技巧在实际项目中确实能够显著提升开发效率和代码质量。本文将深入解析并探讨这四大高级技巧的具体应用,帮助开发者更好地利用Java 8的强大功能。 ... [详细]
  • Spring框架的核心组件与架构解析 ... [详细]
  • 作为140字符的开创者,Twitter看似简单却异常复杂。其简洁之处在于仅用140个字符就能实现信息的高效传播,甚至在多次全球性事件中超越传统媒体的速度。然而,为了支持2亿用户的高效使用,其背后的技术架构和系统设计则极为复杂,涉及高并发处理、数据存储和实时传输等多个技术挑战。 ... [详细]
  • ActiveMQ是由Apache开发的一款广受欢迎且功能强大的开源消息中间件。作为完全符合JMS 1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范已问世多年,但ActiveMQ依然保持了其在消息队列领域的领先地位。本文将带你初步了解ActiveMQ的核心概念及其应用场景,帮助你快速入门这一重要的消息传递技术。 ... [详细]
  • Flink1.10定义UDAGG遇到SQL
    按照以下代码测试定义的UDAGG会一直出现org.apache.flink.table.api.ValidationException:SQLvalidationfailed.nu ... [详细]
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
  • 前言本篇为大家总结社区多人合作常见的场景和对应的git操作命令。本篇非新手教程,阅读本篇前需具备Git基础知识。Git入门教程请参考https://www ... [详细]
author-avatar
ruanbaoshuijingkou
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有