1.前言
本来想整理一下flink部署方案(standalone、yarn、k8)的,写的文档在公司内网,拿不到外网。姑且跳过部署方案,有部署相关问题的,可以评论或私聊给我。直接讲下Flink-Connector-Kafka吧。
2.概述
Flink提供Kafka连接器,用于读取和写入kafka topics。Flink Kafka消费者集成了Flink的检查点机制来提供Flink处理的exactly-once语义(注意:不是端到端保证)。为实现这一点,Flink并不依靠Kafka的消费者group offset跟踪,而是跟踪检查点内部offset。
Flink’s Kafkaconsumer 称为FlinkKafkaConsumer08
(or 09 forKafka 0.9.0.x versions, etc. or just FlinkKafkaConsumer
for Kafka >= 1.0.0 versions)。可消费一个或多个topic。构造方法至少需要(3-4个参数),直接看代码(注:zookeeper.connect仅kafka0.8版本才需要)。
注:当然kafka consumer支持的所有配置项,这里都可以配置。
Properties
properties
=
new
Properties();
properties.setProperty("bootstrap.servers",
"localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect",
"localhost:2181");
properties.setProperty("group.id",
"test");
DataStream<String>
stream
=
env
.addSource(new
FlinkKafkaConsumer08<>("topic",
new
SimpleStringSchema(),
properties));
Consumer从kafka读取到二进制的bytemessage,需要通过反序列化将其转换成Flink的 Java/Scala objects, DeserializationSchema
接口允许用户指定一个
schema
。对于每一条
kafka message
,
deserialize(byte[] message)
方法都被调用,用于完成反序列化。
具体反序列化接口内容:
package org.apache.flink.api.common.serialization; Note: In most cases, one should start from {@link AbstractDeserializationSchema}, which
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import java.io.IOException;
import java.io.Serializable;
/**
* The deserialization schema describes how to turn the byte messages delivered by certain
* data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
* processed by Flink.
*
* In addition, the DeserializationSchema describes the produced type ({@link #getProducedType()}),
* which lets Flink create internal serializers and structures to handle the type.
*
*
* takes care of producing the return type information automatically.
*
* A DeserializationSchema must be {@link Serializable} because its instances are often part of
* an operator or transformation function.
*
* @param <T> The type created by the deserialization schema.
*/
@Public
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Deserializes the byte message.
*
* @param message The message, as a byte array.
*
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
T deserialize(byte[] message) throws IOException;
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
* @return True, if the element signals end of stream, false otherwise.
*/
boolean isEndOfStream(T nextElement);
}
4.这里穿插说明一下flink序列化与反序列化
flink-core默认提供一个最简单的字符序列化/反序列化类(默认:utf-8)
/* 官方demo也有个KafkaEventSchema类实现自定义KafkaEvent(POJO)序列化/反序列化类 /* 总结:数据源不是固定数据格式的话,直接使用SimpleStringSchema即可。否则,可以自己实现固定数据格式对应的POJO。 这个不用多说,直接看demo,一目了然 final StreamExecutionEnvironmentenv = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer08<String>myConsumer = new FlinkKafkaConsumer08<>(...); myConsumer.setStartFromEarliest(); // start fromthe earliest record possible myConsumer.setStartFromLatest(); // start fromthe latest record myConsumer.setStartFromTimestamp(...);// start fromspecified epoch timestamp (milliseconds) myConsumer.setStartFromGroupOffsets();// the defaultbehaviour DataStream<String>stream = env.addSource(myConsumer); Flinkcheckpoint开启时,checkpoint以一致的方式将kafka offsets和其他operations状态一起保存。Job失败,Flink将存储最近一次的checkpoint,启动之后依据该checkpoint内kafka offsets,开始消费kafka数据。 final StreamExecutionEnvironmentenv = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); 默认情况下,partitiondiscovery是关闭的。开启方法:在properties config设置非负值给 Flink Kafka Consumer允许配置offsets的提交到kafka或者(zookeeperin 0.8)。这种方式提交的offsets并不用来保证容错性,仅用来暴露出consumer的消费情况。Offsets提交行为描述:未开启checkpoint,offsets提交依赖于consumer的自动提交( // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; 这一点特别重要,官网没说明。先看下算法实现: * @param partition the Kafka partition 其实算法也很简单,举个例子,topic有三个partition,flinkconsumerkafka并行度为3,那么: partition0对应flinkconsumerkafkasubtask1; partition1对应flinkconsumerkafkasubtask2; partition2对应flinkconsumerkafkasubtask0; 我们都知道partition跟subtask都是从0开始计数的,个人觉得完全可以改成如下算法: return (partition.getPartition()) % numParallelSubtasks; 最合适的做法就是flinkconsumerkafka并行度与kafka partition数相同,即能保证消费效率,也不浪费资源。 10总结 至此,Flink-connector-kafka的consumer用法基本介绍完成。有疑问欢迎沟通交流。 欢迎关注微信公众号
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.serialization;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Very simple serialization schema for strings.
*
* By default, the serializer uses "UTF-8" for string/byte conversion.
*/
@PublicEvolving
public class SimpleStringSchema implements DeserializationSchema
private static final long serialVersionUID = 1L;
/** The charset to use to convert between strings and bytes.
* The field is transient because we serialize a different delegate object instead */
private transient Charset charset;
/**
* Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
*/
public SimpleStringSchema() {
this(StandardCharsets.UTF_8);
}
/**
* Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
*
* @param charset The charset to use to convert between strings and bytes.
*/
public SimpleStringSchema(Charset charset) {
this.charset = checkNotNull(charset);
}
/**
* Gets the charset used by this schema for serialization.
* @return The charset used by this schema for serialization.
*/
public Charset getCharset() {
return charset;
}
// ------------------------------------------------------------------------
Kafka Serialization
------------------------------------------------------------------------
@Override
public String deserialize(byte[] message) {
return new String(message, charset);
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public byte[] serialize(String element) {
return element.getBytes(charset);
}
@Override
public TypeInformation
return BasicTypeInfo.STRING_TYPE_INFO;
}
// ------------------------------------------------------------------------
Java Serialization
------------------------------------------------------------------------
private void writeObject (ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeUTF(charset.name());
}
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset = Charset.forName(charsetName);
}
}
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
/**
* The serialization schema for the {@link KafkaEvent} type. This class defines how to transform a
* Kafka record's bytes to a {@link KafkaEvent}, and vice-versa.
*/
public class KafkaEventSchema implements DeserializationSchema
private static final long serialVersionUID = 6154188370181669758L;
@Override
public byte[] serialize(KafkaEvent event) {
return event.toString().getBytes();
}
@Override
public KafkaEvent deserialize(byte[] message) throws IOException {
return KafkaEvent.fromString(new String(message));
}
@Override
public boolean isEndOfStream(KafkaEvent nextElement) {
return false;
}
@Override
public TypeInformation
return TypeInformation.of(KafkaEvent.class);
}
}5.Kafka
Consumers Start Position Configuration6.Kafka
Consumers and Fault Tolerance7.Kafka
Consumers Topic and Partition Discoveryflink.partition-discovery.interval-millis
,表示
discovery时间间隔(ms)8.Kafka Consumers
Offset Committing Behaviour Configurationnable.auto.commit
(or auto.commit.enable
for Kafka0.8) auto.commit.interval.ms
)开启checkpoint,offsets提交到checkpoint,setCommitOffsetsOnCheckpoints(boolean)默认是开启的。consumer的自动提交强制关闭,这一点见如下源码:
// this overwrites whatever setting the user configured in the properties
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}9.Flink subtask与kafka partition对应算法(重要的一点)
* @param numParallelSubtasks total number of parallel subtasks
*
* @return index of the target subtask that the Kafka partition should be assigned to.
*/
public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
// here, the assumption is that the id of Kafka partitions are always ascending
// starting from 0, and therefore can be used directly as the offset clockwise from the start index
return (startIndex + partition.getPartition()) % numParallelSubtasks;
}