热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

干货|Kafka在大数据环境中的应用

干货|Kafka在大数据环境中的应用

干货|Kafka在大数据环境中的应用

我们生活在一个数据爆炸的时代,数据的巨量增长给我们的业务处理带来了压力,同时巨量的数据也给我们带来了十分可观的财富。随着大数据将各个行业用户、运营商、服务商的数据整合进大数据环境,或用户取用大数据环境中海量的数据,业务平台间的消息处理将变得尤为复杂。如何高效地采集、使用数据,如何减轻各业务系统的压力,也变得越来越突出。在早期的系统实现时,业务比较简单。即便是数据量、业务量比较大,大数据环境也能做出处理。但是随着接入的系统增多,数据量、业务量增大,大数据环境、业务系统都可出现一定的瓶颈。下面我们看几个场景。

场景一:我们开发过一个设备信息挖掘平台。这个平台需要实时将采集互联网关采集到的路由节点的状态信息存入数据中心。通常一个网关一次需要上报几十甚至几百个变化的路由信息。全区有几万个这种互联网关。当信息采集平台将这些变化的数据信息写入或更新到数据库时候,会给数据库代理非常大的压力,甚至可以直接将数据库搞挂掉。这就对我们的数据采集系统提出了很高的要求。如何稳定高效地把消息更新到数据库这一要求摆了出来。

场景二:数据中心处理过的数据需要实时共享给几个不同的机构。我们常采用的方法是将数据批量存放在数据采集机,分支机构定时来采集;或是分支机构通过JDBC、RPC、http或其他机制实时从数据中心获取数据。这两种方式都存在一定的问题,前者在于实时性不足,还牵涉到数据完整性问题;后者在于,当数据量很大的时候,多个分支机构同时读取数据,会对数据中心的造成很大的压力,也造成很大的资源浪费。

为了解决以上场景提出的问题,我们需要这样一个消息系统:

缓冲能力,系统可以提供一个缓冲区,当有大量数据来临时,系统可以将数据可靠的缓冲起来,供后续模块处理;

订阅、分发能力,系统可以接收消息可靠的缓存下来,也可以将可靠缓存的数据发布给使用者。

这就要我们找一个高吞吐的、能满足订阅发布需求的系统。

Kafka是一个分布式的、高吞吐的、基于发布/订阅的消息系统。利用kafka技术可以在廉价PC Server上搭建起大规模的消息系统。Kafka具有消息持久化、高吞吐、分布式、实时、低耦合、多客户端支持、数据可靠等诸多特点,适合在线和离线的消息处理。

使用kafka解决我们上述提到的问题。

干货|Kafka在大数据环境中的应用

互联网关采集到变化的路由信息,通过kafka的producer将归集后的信息批量传入kafka。Kafka按照接收顺序对归集的信息进行缓存,并加入待消费队列。Kafka的consumer读取队列信息,并一定的处理策略,将获取的信息更新到数据库。完成数据到数据中心的存储。

数据中心的数据需要共享时,kafka的producer先从数据中心读取数据,然后传入kafka缓存并加入待消费队列。各分支结构作为数据消费者,启动消费动作,从kafka队列读取数据,并对获取的数据进行处理。

Kafka生产的代码如下:

public void produce(){      
 
                   //生产消息预处理 
 
        produceInfoProcess();        
 
        pro.send(ProducerRecord,new Callback(){ 
 
                            @Override 
 
                            onCompletion() { 
 
                                     if (metadata == null) { 
 
                                               // 发送失败 
 
                                               failedSend(); 
 
                                     } else { 
 
                                               //发送成功!"  
 
                                               successedSend();      
 
} 
 
                            }                       
 
              });   
 
     }  

消息生产者根据需求,灵活定义produceInfoProcess()方法,对相关数据进行处理。并依据数据发布到kafka的情况,处理回调机制。在数据发送失败时,定义failedSend()方法;当数据发送成功时,定义successedSend()方法。

Kafka消费的代码如下:

public void consumer() { 
 
                       //配置文件 
 
            properties(); 
 
            //获取当前数据的迭代器 
 
            iterator = stream.iterator(); 
 
            while (iterator.hasNext()) { 
 
                //取出消息 
 
                MessageAndMetadata next = iterator.next(); 
 
                messageProcess(); 
 
                 }       
 
    }  

Kafka消费者会和kafka集群建立一个连接。从kafka读取数据,调用messageProcess()方法,对获取的数据灵活处理。

结论

Kafka的高吞吐能力、缓存机制能有效的解决高峰流量冲击问题。实践表明,在未将kafka引入系统前,当互联网关发送的数据量较大时,往往会挂起关系数据库,数据常常丢失。在引入kafka后,更新程序能够结合能力自主处理消息,不会引起数据丢失,关系型数据库的压力波动不会发生过于显著的变化,不会出现数据库挂起锁死现象。

依靠kafka的订阅分发机制,实现了一次发布,各分支依据需求自主订阅的功能。避免了各分支机构直接向数据中心请求数据,或者数据中心依次批量向分支机构传输数据以致实时性不足的情况。kafka提高了实时性,减轻了数据中心的压力,提高了效率。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 我们


推荐阅读
  • EPICS Archiver Appliance存储waveform记录的尝试及资源需求分析
    本文介绍了EPICS Archiver Appliance存储waveform记录的尝试过程,并分析了其所需的资源容量。通过解决错误提示和调整内存大小,成功存储了波形数据。然后,讨论了储存环逐束团信号的意义,以及通过记录多圈的束团信号进行参数分析的可能性。波形数据的存储需求巨大,每天需要近250G,一年需要90T。然而,储存环逐束团信号具有重要意义,可以揭示出每个束团的纵向振荡频率和模式。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 在Oracle11g以前版本中的的DataGuard物理备用数据库,可以以只读的方式打开数据库,但此时MediaRecovery利用日志进行数据同步的过 ... [详细]
  • ***byte(字节)根据长度转成kb(千字节)和mb(兆字节)**parambytes*return*publicstaticStringbytes2kb(longbytes){ ... [详细]
  • 精讲代理设计模式
    代理设计模式为其他对象提供一种代理以控制对这个对象的访问。代理模式实现原理代理模式主要包含三个角色,即抽象主题角色(Subject)、委托类角色(被代理角色ÿ ... [详细]
  • java多线程获取线程返回结果
    我们在使用java多线程编写相关业务代码时,往往有这样一种情况,某个线程依赖于其他线程执行结果。也就是说,我们需要在一个线程中获取另一个线程的信息。可以分为两种情况,一种是轮询,一 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 本文介绍了在Python中使用zlib模块进行字符串的压缩与解压缩的方法,并探讨了其在内存优化方面的应用。通过压缩存储URL等长字符串,可以大大降低内存消耗,虽然处理时间会增加,但是整体效果显著。同时,给出了参考链接,供进一步学习和应用。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 由于同源策略的限制,满足同源的脚本才可以获取资源。虽然这样有助于保障网络安全,但另一方面也限制了资源的使用。那么如何实现跨域呢,以下是实现跨域的一些方法。 ... [详细]
  • 开发笔记:图像识别基于主成分分析算法实现人脸二维码识别
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了图像识别基于主成分分析算法实现人脸二维码识别相关的知识,希望对你有一定的参考价值。 ... [详细]
  • Matlab 中的一些小技巧(2)
    1.Ctrl+D打开子程序  在MATLAB的Editor中,将输入光标放到一个子程序名称中间,然后按Ctrl+D可以打开该子函数的m文件。当然这个子程序要在路径列表中(或在当前工作路径中)。实际上 ... [详细]
author-avatar
范二小姐儿
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有