热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka流处理平台

1.Kafka简介Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特
1. Kafka简介

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

Kafka具有以下特性:

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写

Kafka的使用场景:

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和storm
  • 事件源

通过上面的介绍也可以看出:Kafka给自身的定位并不仅仅是一个消息系统,而是通过发布订阅消息机制实现的分布式流平台。

流平台有三个关键的能力:

  • 发布订阅记录流,和消息队列或者企业新消息系统类似。
  • 以可容错、持久的方式保存记录流
  • 当记录流产生时就进行处理

Kafka通常用于应用中的两种广播类型:

  • 在系统和应用间建立实时的数据管道,能够可信赖的获取数据。
  • 建立实时的流应用,可以处理或者响应数据流。
2. Kafka基本概念及延伸

2.1 基本概念

Producer:数据生产者

  • 消息和数据的生产者
  • 向Kafka的一个topic发布消息的进程或代码或服务

Consumer:数据消费者

  • 消息和数据的消费者
  • 向Kafka订阅数据(topic)并且处理其发布的消息的进程或代码或服务

Consumer Group:消费者组

  • 对于同一个topic,会广播给不同的Group
  • 一个Group中,只有一个Consumer可以消费该消息

Broker:服务节点

  • Kafka集群中的每个Kafka节点

Topic:主题

  • Kafka消息的类别
  • 对数据进行区分、隔离

Partition:分区

  • Kafka中数据存储的基本单元
  • 一个topic数据,会被分散存储到多个Partition
  • 一个Partition只会存在一个Broker上
  • 每个Partition是有序的

Replication:分区的副本

  • 同一个Partition可能会有多个Replication
  • 多个Replication之间数据是一样的

Replication Leader:副本的老大

  • 一个Partition的多个Replication上
  • 需要一个Leader负责该Partition上与Producer和Consumer交互

Replication Manager:副本的管理者

  • 负责管理当前Broker所有分区和副本的信息
  • 处理KafkaController发起的一些请求
  • 副本状态的切换
  • 添加、读取消息等

2.2 概念延伸

Partition:分区

  • 每一个Topic被切分为多个Partition
  • 消费者数目少于或等于Partition的数目
  • Broker Group中的每一个Broker保存Topic的一个或多个Partition
  • Consumer Group中的仅有一个Consumer读取Topic的一个或多个Partition,并且是惟一的Consumer

Replication:分区的副本

  • 当集群中有Broker挂掉的情况,系统可以主动地使Replication提供服务
  • 系统默认设置每一个Topic的Replication系数为1,可以在创建Topic时单独设置
  • Replication的基本单位是Topic的Partition
  • 所有的读和写都从Replication Leader进行,Replication Followers只是作为备份
  • Replication Followers必须能够及时复制Replication Leader的数据
  • 增加容错性与可扩展性
3. 基本结构

Kafka功能结构

 

Kafka数据流势

 

Kafka消息结构

  • Offset:当前消息所处于的偏移
  • Length:消息的长度
  • CRC32:校验字段,用于校验当前信息的完整性
  • Magic:很多分布式系统都会设计该字段,固定的数字,用于快速判定当前信息是否为Kafka消息
  • attributes:可选字段,消息的属性
  • Timestamp:时间戳
  • Key Length:Key的长度
  • Key:Key
  • Value Length:Value的长度
  • Value:Value
 4. Kafka安装部署

Kafka依赖于zookeeper实现分布式系统的协调,所以需要同时安装zookeeper。两个的安装包到官网下载。

4.1 zookeeper安装配置

在zookeeper解压后的目录下找到conf文件夹,进入后,复制文件zoo_sample.cfg,并命名为zoo.cfg。zoo.cfg中一共五个配置项,可以使用默认配置。

 

4.2 Kafka安装配置

进入kafka根目录下的config文件夹下,打开server.properties,修改如下配置项(一般默认即为如下,无需修改)

zookeeper.cOnnect=localhost:2181
broker.id=0
log.dirs=/tmp/kafka-logs

另外,config文件夹下也包含有zookeeper的配置文件,可以在其中设置配置项,启动zookeeper时引用这个配置文件,实现定制化。

Kafka的bin目录包含了大多数功能的启动脚本,可以通过它们控制Kafka的功能开启。

 启动Kafka

4.3 使用控制台操作生产者和消费者

创建Topic:sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-kafka-topic
查看Topic:sudo ./bin/kafka-topics.sh --list --zookeeper localhost:2181
启动生产者:sudo ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-kafka-topic
启动消费者:sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-topic --from-beginning
生产消息:first message
生产消息:second message
 5. 代码示例

引入依赖pom.xml

xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0modelVersion>
    <parent>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-parentartifactId>
        <version>2.1.2.RELEASEversion>
        <relativePath/> 
    parent>
    <groupId>com.zanggroupId>
    <artifactId>kafkaartifactId>
    <version>0.0.1-SNAPSHOTversion>
    <name>kafkaname>
    <description>Demo project for Spring Bootdescription>

    <properties>
        <java.version>1.8java.version>
    properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-webartifactId>
        dependency>
        <dependency>
            <groupId>org.springframework.kafkagroupId>
            <artifactId>spring-kafkaartifactId>
        dependency>

        <dependency>
            <groupId>org.projectlombokgroupId>
            <artifactId>lombokartifactId>
            <optional>trueoptional>
        dependency>
        <dependency>
            <groupId>com.alibabagroupId>
            <artifactId>fastjsonartifactId>
            <version>1.2.36version>
        dependency>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-testartifactId>
            <scope>testscope>
        dependency>
        <dependency>
            <groupId>org.springframework.kafkagroupId>
            <artifactId>spring-kafka-testartifactId>
            <scope>testscope>
        dependency>
    dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-maven-pluginartifactId>
            plugin>
        plugins>
    build>

project>

相应实体

package com.zang.kafka.common;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;


/**
 * 〈消息实体〉
*/ @Getter @Setter @EqualsAndHashCode @ToString public class MessageEntity { /** * 标题 */ private String title; /** * 内容 */ private String body; }
package com.zang.kafka.common;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;


/**
 * 〈REST请求统一响应对象〉
*/ @Getter @Setter public class Response implements Serializable{ private static final long serialVersiOnUID= -1523637783561030117L; /** * 响应编码 */ private int code; /** * 响应消息 */ private String message; public Response(int code, String message) { this.code = code; this.message = message; } }
package com.zang.kafka.common;

/**
 * 〈错误编码〉
*/ public class ErrorCode { /** * 成功 */ public final static int SUCCESS = 200; /** * 失败 */ public final static int EXCEPTION = 500; }

生产者

package com.zang.kafka.producer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 〈生产者〉
 */
@Component
public class SimpleProducer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(String topic, String key, Object entity) {
        logger.info("发送消息入参:{}", entity);
        ProducerRecord record = new ProducerRecord<>(
                topic,
                key,
                JSON.toJSONString(entity)
        );

        long startTime = System.currentTimeMillis();
        ListenableFuture> future = this.kafkaTemplate.send(record);
        future.addCallback(new ListenableFutureCallback>() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error("消息发送失败:{}", ex);
            }

            @Override
            public void onSuccess(SendResult result) {
                long elapsedTime = System.currentTimeMillis() - startTime;

                RecordMetadata metadata = result.getRecordMetadata();
                StringBuilder record = new StringBuilder(128);
                record.append("message(")
                        .append("key = ").append(key).append(",")
                        .append("message = ").append(entity).append(")")
                        .append("send to partition(").append(metadata.partition()).append(")")
                        .append("with offset(").append(metadata.offset()).append(")")
                        .append("in ").append(elapsedTime).append(" ms");
                logger.info("消息发送成功:{}", record.toString());
            }
        });
    }

}

消费者

package com.zang.kafka.consumer;

import com.alibaba.fastjson.JSONObject;
import com.zang.kafka.common.MessageEntity;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * 〈消费者〉
*/ @Component public class SimpleConsumer { private Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = "${kafka.topic.default}") public void listen(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { //判断是否NULL Optional kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { //获取消息 Object message = kafkaMessage.get(); MessageEntity messageEntity = JSONObject.parseObject(message.toString(), MessageEntity.class); logger.info("接收消息Topic:{}", topic); logger.info("接收消息Record:{}", record); logger.info("接收消息Message:{}", messageEntity); } } }

控制器

package com.zang.kafka.controller;

import com.alibaba.fastjson.JSON;
import com.zang.kafka.common.ErrorCode;
import com.zang.kafka.common.MessageEntity;
import com.zang.kafka.common.Response;
import com.zang.kafka.producer.SimpleProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;

/**
 * 〈生产者〉
*/ @RestController @RequestMapping("/producer") public class ProducerController { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private SimpleProducer simpleProducer; @Value("${kafka.topic.default}") private String topic; private static final String KEY = "key";/** * 消息发送 * @param message * @return */ @PostMapping("/send") public Response sendKafka(@RequestBody MessageEntity message) { try { logger.info("kafka的消息:{}", JSON.toJSONString(message)); this.simpleProducer.send(topic, KEY, message); logger.info("kafka消息发送成功!"); return new Response(ErrorCode.SUCCESS,"kafka消息发送成功"); } catch (Exception ex) { logger.error("kafka消息发送失败:", ex); return new Response(ErrorCode.EXCEPTION,"kafka消息发送失败"); } } }

配置application.properties

##----------kafka配置
## TOPIC
kafka.topic.default=my-kafka-topic
# kafka地址
spring.kafka.bootstrap-servers=47.88.156.142:9092
# 生产者配置
spring.kafka.producer.retries=0
# 批量发送消息的数量
spring.kafka.producer.batch-size=4096
# 缓存容量
spring.kafka.producer.buffer-memory=40960
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消费者配置
spring.kafka.consumer.group-id=my
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.cOncurrency=3

启动类

package com.zang.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@SpringBootApplication
@EnableKafka
public class KafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }

}
6. Kafka的高级特性

6.1 消息事务

为什么要支持事务

  • 满足“读取-处理-写入”模式
  • 流处理需求的不断增强
  • 不准确的数据处理的容忍度不断降低

数据传输的事务定义

  • 最多一次:消息不会被重复发送,最多被传输一次,但也有可能一次不传输
  • 最少一次:消息不会被漏发送,最少被传输一次,但也有可能被重复传输
  • 精确的一次(Exactly once):不会漏传输也不会重复传输,每个消息都被传输一次且仅仅被传输一次,这是大家所期望的

事务保证

  • 内部重试问题:Procedure幂等处理
  • 多分区原子写入
  • 避免僵尸实例

    •  每个事务Procedure分配一个 transactionl. id,在进程重新启动时能够识别相同的Procedure实例
    •  Kafka增加了一个与transactionl.id相关的epoch,存储每个transactionl.id内部元数据
    •  一旦epoch被触发,任务具有相同的transactionl.id和更旧的epoch的Producer被视为僵尸,Kafka会拒绝来自这些Producer的后续事务性写入

6.2 零拷贝

零拷贝简介

  • 通过网络传输持久性日志块
  • 使用Java Nio channel.transforTo()方法实现
  • 底层使用Linux sendfile系统调用

文件传输到网络的公共数据路径

  • 第一次拷贝:操作系统将数据从磁盘读入到内核空间的页缓存
  • 第二次拷贝:应用程序将数据从内核空间读入到用户空间缓存中
  • 第三次拷贝:应用程序将数据写回到内核空间到socket缓存中
  • 第四次拷贝:操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出

零拷贝过程(指内核空间和用户空间的交互拷贝次数为零)

  • 第一次拷贝:操作系统将数据从磁盘读入到内核空间的页缓存
  • 将数据的位置和长度的信息的描述符增加至内核空间(socket缓存区)
  • 第二次拷贝:操作系统将数据从内核拷贝到网卡缓冲区,以便将数据经网络发出

 

 

 

来源:

慕课网课程:https://www.imooc.com/learn/1043

参考:

https://blog.csdn.net/liyiming2017/article/details/82790574

https://blog.csdn.net/YChenFeng/article/details/74980531


推荐阅读
  • 云原生应用最佳开发实践之十二原则(12factor)
    目录简介一、基准代码二、依赖三、配置四、后端配置五、构建、发布、运行六、进程七、端口绑定八、并发九、易处理十、开发与线上环境等价十一、日志十二、进程管理当 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • ejava,刘聪dejava
    本文目录一览:1、什么是Java?2、java ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • 本文介绍了一个React Native新手在尝试将数据发布到服务器时遇到的问题,以及他的React Native代码和服务器端代码。他使用fetch方法将数据发送到服务器,但无法在服务器端读取/获取发布的数据。 ... [详细]
  • 统一知识图谱学习和建议:更好地理解用户偏好
    本文介绍了一种将知识图谱纳入推荐系统的方法,以提高推荐的准确性和可解释性。与现有方法不同的是,本方法考虑了知识图谱的不完整性,并在知识图谱中传输关系信息,以更好地理解用户的偏好。通过大量实验,验证了本方法在推荐任务和知识图谱完成任务上的优势。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了软件测试知识点之数据库压力测试方法小结相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 《树莓派开发实战(第2版)》——2.2 创建模型和运行推理:重回Hello World
    本节书摘来异步社区《概率编程实战》一书中的第2章,第2.2节,作者:【美】AviPfeffer(艾维费弗)&# ... [详细]
author-avatar
奋斗的筱清年
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有