序
本文简单介绍下spring-cloud-stream-binder-kafka的一些属性配置。
maven
stream属性
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/config/ChannelBindingServiceProperties.java
spring:cloud:stream:instanceIndex: 0 ##支持环境变量INSTANCE_INDEX## The instance index of the application: a number from 0 to instanceCount-1. Used for partitioning and with KafkainstanceCount: 1 ## The number of deployed instances of an application. Must be set for partitioning and if using Kafka.## used to partition data across different consumers.
Topic在逻辑上可以被认为是一个queue。每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得 Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储 这个partition的所有消息和索引文件。partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
同一个partition内的消息只能被同一个组中的一个consumer消费。
当消费者数量多于partition的数量时,多余的消费者空闲。
消费者少于和等于partition的数量时,会出现多个partition对应一个消费者的情况,个别消费者消费量会比其他的多。
instanceCount主要是consumer用的,一般小于或等于topic的partition数量,主要用作消费者的消费分区用。
bingdings属性
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/config/BindingProperties.java
spring:cloud:stream:bindings:output:destination: event-democontent-type: text/plain#group: test ##consumer属性#producer:#consumer:
producer
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/ProducerProperties.java
spring:cloud:stream:bindings:output:destination: event-democontent-type: text/plainproducer:partitionCount: 1headerModepartitionKeyExtractorClass: org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClasspartitionSelectorClass: org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClassheaderMode: raw
- kafka producer扩展属性
spring-cloud-stream-binder-kafka-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/kafka/KafkaProducerProperties.java
spring:cloud:stream:bindings:output:destination: event-democontent-type: text/plainproducer:bufferSize: 16384maxRequestSize: 1048576sync: truebatchTimeout: 0
consumer
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/ConsumerProperties.java
spring:cloud:stream:bindings:input:destination: event-democontent-type: text/plainconsumer:concurrency: 1 ## The concurrency of the inbound consumer.partitioned: false ## Whether the consumer receives data from a partitioned producer.Default: false.headerMode: raw
- kafka consumer扩展属性
spring-cloud-stream-binder-kafka-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/kafka/KafkaConsumerProperties.java
spring:cloud:stream:bindings:input:destination: event-democontent-type: text/plainconsumer:autoCommitOffset: falseresetOffsets: truestartOffset: earliestenableDlq: falserecoveryInterval: 5000
原生api
ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);Map
这里头topicCountMap告诉Kafka我们在Consumer中将用多少个线程来消费该topic。topicCountMap的key是topic name,value针对该topic是线程的数量。
小结
整体的话,spring cloud stream自己抽象了一部分,但是有个硬伤就是spring.cloud.stream.instanceIndex这个不大友好,这样就造成服务的实例是有状态的了,在基于docker部署起来比较麻烦,还不如直接原生api。如果partition不多,或者每个consumer性能强悍的话,那么至少部署两个,配置起来也还可以接受。
doc
- spring-cloud-stream-binder-kafka-docs
- spring-cloud-stream-docs
- SpringCloudStream 构建消息驱动的微服务框架
- kafka中partition和消费者对应关系
- kafka学习(四)-Topic & Partition