热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka使用总结与生产消费Demo实现

什么是kafkaKafka官网自己的介绍是:一个可支持分布式的流平台。kafka官网介绍kafka三个关键能力:1.发布订阅记录流,类似于消息队列

什么是kafka

Kafka官网自己的介绍是:一个可支持分布式的流平台。
kafka官网介绍

 kafka三个关键能力:
     1.发布订阅记录流,类似于消息队列与企业信息系统
     2.以容错的持久方式存储记录流
     3.对流进行处理

kafka通常应用再两大类应用中:
    1.构建实时流数据管道,在系统或应用程序之间可靠地获取数据
    2.构建转换或响应数据流的实时流应用程序

kafka的一些基本概念:
    1.Kafka作为一个集群运行在一个或多个服务器上,这些服务器可以跨越多个数据中心。
    2.Kafka集群将记录流存储在称为topic的类别中。
    3.每个记录由一个键、一个值和一个时间戳组成。

kafka核心API:
    1.Producer API:允许应用程序将记录流发布到一个或多个topic。
    2.Consumer API:允许应用程序订阅一个或多个topic并处理生成给它们的记录流。
    3.Streams API:允许应用程序充当流处理器,使用来自一个或多个topic的输入流,
    并生成一个或多个输出topic的输出流,从而有效地将输入流转换为输出流。
    4.Connector API:允许构建和运行可重用的生产者或消费者,将topic连接到现有的应用程序或数据系统。
    例如,到关系数据库的连接器可能捕获对表的每个更改。

作为消息系统

传统消息传递有两类模型:消息队列、发布订阅。在消息队列中,一个消费者池可以从一个服务器读取数据,而每个记录都将被发送到其中一个服务器;在发布-订阅中,记录被广播给所有消费者。这两种模型各有优缺点:

    消息队列优缺点:
        它允许您在多个使用者实例上划分数据处理,这使您可以扩展处理。
        队列不是多订阅者的—一旦一个进程读取了它丢失的数据。

    发布订阅优缺点:
        Publish-subscribe允许您将数据广播到多个进程,
        但是由于每个消息都传递到每个订阅者,因此无法扩展处理。

作为消息传递系统,那么跟mq有什么区别呢?(RabbitMq\redis\RocketMq\ActiveMq)

RabbitMQ:
     遵循AMQP协议,由内在高并发的erlang语言开发,用在实时的对可靠性要求比较高的消息传递上.
     万级数据量,社区活跃度极高,可视化操作界面丰富。
     提供了全面的核心功能,是消息队列的优秀产品。
     因为是erlang语言开发,难以维护并且开发者很难二次开发。

Redis:
    redis的主要场景是内存数据库,作为消息队列来说可靠性太差,而且速度太依赖网络IO。
    在服务器本机上的速度较快,且容易出现数据堆积的问题,在比较轻量的场合下能够适用。

RocketMq:
    rocketMq几十万级别数据量,基于Java开发。是阿里巴巴开源的一个消息产品。
    应对了淘宝双十一考验,并且文档十分的完善,拥有一些其他消息队列不具备的高级特性,
    如定时推送,其他消息队列是延迟推送,如rabbitMq通过设置expire字段设置延迟推送时间。
    又比如rocketmq实现分布式事务,比较可靠的。RocketMq也是用过的唯一支持分布式事务的一款产品。

Kafka:
    kafka原本设计的初衷是日志统计分析,现在基于大数据的背景下也可以做运营数据的分析统计。
    kafka真正的大规模分布式消息队列,提供的核心功能比较少。基于zookeeper实现的分布式消息订阅。
    几十万级数据量级,比RokectMq更强。
    客户端和服务器之间的通信是通过一个简单的、高性能的、语言无关的TCP协议来完成的。

ActiveMq:
    Apache ActiveMQ™是最流行的开源、多协议、基于java的消息服务器。它支持行业标准协议,
    因此用户可以在各种语言和平台上选择客户端。可以使用来自C、c++、Python、. net等的连接性。
    使用通用的AMQP协议集成您的多平台应用程序。使用STOMP在websockets上交换web应用程序之间的消息。
    使用MQTT管理物联网设备。支持您现有的JMS基础结构及其他。ActiveMQ提供了支持任何messagi的强大功能
    和灵活性。

备注:因为该文章主要介绍kafka,所以上述只是简单罗列了一些特点,如果有兴趣的同学可以详细的分析一下,这些产品我后续都会专门写文章来归纳总结分析,在这里先简单带过。

为什么要用消息队列?

该部分是扩展内容,很多人包括我刚毕业那年使用消息队列,但别人问道我为啥用消息队列,我都没有一个很清晰的认识,所以在这里也说一下。希望给有需要的同学一些帮助。

那么为什么要使用消息队列呢?首先我们来回顾一下消息传递。前端而言,传统方式是通过全局变量来传递,后面有了数据总线的概念,再后来有相应的解决方案产品比如说vuex、redux、store等。对于后端来说,最先系统之间的通信,消息传递都非常依赖于通信对象彼此,高度耦合,后面有了一些产品来解决这些问题,比如说webservice.但这样的方式极其不友好,而且维护繁琐,职责难以分清,工作量增加,所以mq诞生后,基本解决了这些问题。

消息队列的引入是为了:

1.解耦:
    比如:A系统操作p,需要将消息传递给B、C两个系统,如果没有消息队列,那么A系统中需要给B发一条消息,
    又得给C发一条消息,然后有一天D、E、F系统说:A系统你也要给我发p的消息,这个时候A又得修改代码,
    发布上线,DEF才能正常接收消息。然后过了n天,C又说,不要给我发消息了,把给我发消息的部分去掉吧。
    A系统的开发人员又得哐哧哐哧的去掉,发布上线。这样日复一日,随着系统增多,接入和退出的操作增多,
    那么A系统需要频繁发布上线,降低了稳定性、可用时间、同时每次上线都需要测试跟踪测试,这里面的成本
    与风险不言而喻。而消息队列一旦引入,A不需要关心谁消费,谁退出消费,A只负责将消息放入队列即可,
    而其他系统只需要监听这个队列,就算其他系统退出,对A而言也是没有任何影响的,能够一直持续不断的
    提供服务,这难道不香吗?

2.异步
    比如说:传统方式发送消息给B、C、D,需要120ms,那么如果采用了消息队列,就可以大大降低耗时。但
    这些对于那些非必要的同步业务逻辑适用。

3.削峰
    传统模式下,请求直接进入到数据库,当峰值到达一定时,必然会挂掉。如果适用了中间件消息队列,那么就可以很好的保证系统正常提供服务,这也是秒杀系统中会常常谈到的限流、这样可以防止系统崩溃,提供系统可用性。

配置MAVEN

        
            org.apache.kafka
            kafka_2.12
            1.0.0
            provided
        
        
            org.apache.kafka
            kafka-clients
            1.0.0
        
        
            org.apache.kafka
            kafka-streams
            1.0.0
        

生产者

/**
 * @author chandlerHuang
 * @description @TODO
 * @date 2020/1/15
 */
public class KafkaProducerService implements Runnable {

    private final KafkaProducer producer;

    private final String topic;

    public KafkaProducerService(String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "绑定的外网IP:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        this.producer = new KafkaProducer(props);
        this.topic = topic;
    }

    @Override
    public void run() {
        int messageNo = 1;
        try {
            for(;;) {
                String messageStr="["+messageNo+"]:hello,boys!";
                producer.send(new ProducerRecord(topic, "Message", messageStr));
                //生产了100条就打印
                if(messageNo%100==0){
                    System.out.println("sendMessages:" + messageStr);
                }
                //生产1000条就退出
                if(messageNo%1000==0){
                    System.out.println("successCount:"+messageNo);
                    break;
                }
                messageNo++;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }

    public static void main(String args[]) {
        KafkaProducerService test = new KafkaProducerService(TopicConstant.CHART_TOPIC);
        Thread thread = new Thread(test);
        thread.start();
    }
}

Kafka使用总结与生产消费Demo实现
Kafka使用总结与生产消费Demo实现

消费者

/**
 * @author chandlerHuang
 * @description @TODO
 * @date 2020/1/15
 */
public class KafkaConsumerService implements Runnable{

    private final KafkaConsumer consumer;
    private ConsumerRecords msgList;
    private final String topic;
    private static final String GROUPID = "groupA";

    public KafkaConsumerService(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "绑定的外网IP:9092");
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.cOnsumer= new KafkaConsumer(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        int messageNo = 1;
        System.out.println("---------开始消费---------");
        try {
            for (;;) {
                msgList = consumer.poll(1000);
                if(null!=msgList&&msgList.count()>0){
                    for (ConsumerRecord record : msgList) {
                        //消费100条就打印 ,但打印的数据不一定是这个规律的
                        if(messageNo%100==0){
                            System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                        }
                        //当消费了1000条就退出
                        if(messageNo%1000==0){
                            break;
                        }
                        messageNo++;
                    }
                }else{
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
    public static void main(String args[]) {
        KafkaConsumerService test1 = new KafkaConsumerService(TopicConstant.CHART_TOPIC);
        Thread thread1 = new Thread(test1);
        thread1.start();
    }
}

Kafka使用总结与生产消费Demo实现

备注:上述demo编写过程中,发现报了一个Exception:Kafka java client 连接异常(org.apache.kafka.common.errors.TimeoutException: Failed to update metadata )...

kafka中需要配置server.文件:

advertised.listeners=PLAINTEXT://外网地址:9092

zookeeper.cOnnect=内网地址:2181

如果你是云服务器的话需要,在安全组设置对应端口开放,否则无法访问响应接口!


推荐阅读
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 深入解析CAS机制:全面替代传统锁的底层原理与应用
    本文深入探讨了CAS(Compare-and-Swap)机制,分析了其作为传统锁的替代方案在并发控制中的优势与原理。CAS通过原子操作确保数据的一致性,避免了传统锁带来的性能瓶颈和死锁问题。文章详细解析了CAS的工作机制,并结合实际应用场景,展示了其在高并发环境下的高效性和可靠性。 ... [详细]
  • 在《Cocos2d-x学习笔记:基础概念解析与内存管理机制深入探讨》中,详细介绍了Cocos2d-x的基础概念,并深入分析了其内存管理机制。特别是针对Boost库引入的智能指针管理方法进行了详细的讲解,例如在处理鱼的运动过程中,可以通过编写自定义函数来动态计算角度变化,利用CallFunc回调机制实现高效的游戏逻辑控制。此外,文章还探讨了如何通过智能指针优化资源管理和避免内存泄漏,为开发者提供了实用的编程技巧和最佳实践。 ... [详细]
  • 提升 Kubernetes 集群管理效率的七大专业工具
    Kubernetes 在云原生环境中的应用日益广泛,然而集群管理的复杂性也随之增加。为了提高管理效率,本文推荐了七款专业工具,这些工具不仅能够简化日常操作,还能提升系统的稳定性和安全性。从自动化部署到监控和故障排查,这些工具覆盖了集群管理的各个方面,帮助管理员更好地应对挑战。 ... [详细]
  • 尽管我们尽最大努力,任何软件开发过程中都难免会出现缺陷。为了更有效地提升对支持部门的协助与支撑,本文探讨了多种策略和最佳实践,旨在通过改进沟通、增强培训和支持流程来减少这些缺陷的影响,并提高整体服务质量和客户满意度。 ... [详细]
  • 【并发编程】全面解析 Java 内存模型,一篇文章带你彻底掌握
    本文深入解析了 Java 内存模型(JMM),从基础概念到高级特性进行全面讲解,帮助读者彻底掌握 JMM 的核心原理和应用技巧。通过详细分析内存可见性、原子性和有序性等问题,结合实际代码示例,使开发者能够更好地理解和优化多线程并发程序。 ... [详细]
  • 本文推荐了六款高效的Java Web应用开发工具,并详细介绍了它们的实用功能。其中,分布式敏捷开发系统架构“zheng”项目,基于Spring、Spring MVC和MyBatis技术栈,提供了完整的分布式敏捷开发解决方案,支持快速构建高性能的企业级应用。此外,该工具还集成了多种中间件和服务,进一步提升了开发效率和系统的可维护性。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • 优化后的标题:PHP分布式高并发秒杀系统设计与实现
    PHPSeckill是一个基于PHP、Lua和Redis构建的高效分布式秒杀系统。该项目利用php_apcu扩展优化性能,实现了高并发环境下的秒杀功能。系统设计充分考虑了分布式架构的可扩展性和稳定性,适用于大规模用户同时访问的场景。项目代码已开源,可在Gitee平台上获取。 ... [详细]
  • 如何正确配置与使用日志组件:Log4j、SLF4J及Logback的连接与整合方法
    在当前的软件开发实践中,无论是开源项目还是日常工作中,日志框架都是不可或缺的工具之一。本文详细探讨了如何正确配置与使用Log4j、SLF4J及Logback这三个流行的日志组件,并深入解析了它们之间的连接与整合方法,旨在帮助开发者高效地管理和优化日志记录流程。 ... [详细]
  • 开发心得:利用 Redis 构建分布式系统的轻量级协调机制
    开发心得:利用 Redis 构建分布式系统的轻量级协调机制 ... [详细]
  • ZeroMQ在云计算环境下的高效消息传递库第四章学习心得
    本章节深入探讨了ZeroMQ在云计算环境中的高效消息传递机制,涵盖客户端请求-响应模式、最近最少使用(LRU)队列、心跳检测、面向服务的队列、基于磁盘的离线队列以及主从备份服务等关键技术。此外,还介绍了无中间件的请求-响应架构,强调了这些技术在提升系统性能和可靠性方面的应用价值。个人理解方面,ZeroMQ通过这些机制有效解决了分布式系统中常见的通信延迟和数据一致性问题。 ... [详细]
  • 阿里巴巴Java后端开发面试:TCP、Netty、HashMap、并发锁与红黑树深度解析 ... [详细]
  • 在DB2数据库的性能调优与设计策略中,物理设计是关键环节。具体包括:1. 容器设计:采用条带化技术、裸设备以及支持并发I/O的配置,以提高数据访问效率。2. 存储方案:建议使用RAID5用于日志存储,以平衡成本和性能;而数据存储则推荐使用RAID10,确保高可靠性和读写性能。3. 系统配置:合理配置系统参数,优化内存管理和缓存策略,进一步提升整体性能。 ... [详细]
author-avatar
中国传媒大学一零播本更_822
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有