热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkConnectorKafka(一)

1.前言本来想整理一下flink部署方案(standalone、yarn、k8)的


1.前言

本来想整理一下flink部署方案(standalone、yarn、k8)的,写的文档在公司内网,拿不到外网。姑且跳过部署方案,有部署相关问题的,可以评论或私聊给我。直接讲下Flink-Connector-Kafka吧。

2.概述

Flink提供Kafka连接器,用于读取和写入kafka topicsFlink Kafka消费者集成了Flink的检查点机制来提供Flink处理的exactly-once语义(注意:不是端到端保证)。为实现这一点,Flink并不依靠Kafka的消费者group offset跟踪,而是跟踪检查点内部offset

Flink’s Kafkaconsumer 称为FlinkKafkaConsumer08
 (or 09 forKafka 0.9.0.x versions, etc. or just FlinkKafkaConsumer
 for Kafka >= 1.0.0 versions)。可消费一个或多个topic构造方法至少需要(3-4个参数),直接看代码(注:zookeeper.connectkafka0.8版本才需要)。

注:当然kafka consumer支持的所有配置项,这里都可以配置。

Properties
properties
=
new
Properties();

properties.setProperty("bootstrap.servers",
"localhost:9092");

// only required for Kafka 0.8

properties.setProperty("zookeeper.connect",
"localhost:2181");

properties.setProperty("group.id",
"test");

DataStream<String>
stream
=
env

         
.addSource(new
FlinkKafkaConsumer08<>("topic",
new
SimpleStringSchema(),
properties));


3.反序列化

Consumerkafka读取到二进制的bytemessage,需要通过反序列化将其转换成Flink Java/Scala objects DeserializationSchema
接口允许用户指定一个
schema
。对于每一条
kafka message

 deserialize(byte[] message)
方法都被调用,用于完成反序列化。
具体反序列化接口内容:

package org.apache.flink.api.common.serialization;
import
org.apache.flink.annotation.Public;
import
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import
java.io.IOException;
import
java.io.Serializable;
/**
 * The deserialization schema describes how to turn the byte messages delivered by certain
 * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
 * processed by Flink.
 *
 *

In addition, the DeserializationSchema describes the produced type ({@link #getProducedType()}),
 * which lets Flink create internal serializers and structures to handle the type.
 *
 *

Note: In most cases, one should start from {@link AbstractDeserializationSchema}, which
 * takes care of producing the return type information automatically.
 *
 *

A DeserializationSchema must be {@link Serializable} because its instances are often part of
 * an operator or transformation function.
 *
 * @param
<T> The type created by the deserialization schema.
 */
@Public
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

  
/**
    * Deserializes the byte message.
    *
    * @param
message The message, as a byte array.
    *
    * @return The deserialized message as an object (null if the message cannot be deserialized).
    */
  
T deserialize(byte[] message) throws IOException;

  
/**
    * Method to decide whether the element signals the end of the stream. If
    * true is returned the element won't be emitted.
    *
    * @param
nextElement The element to test for the end-of-stream signal.
    * @return True, if the element signals end of stream, false otherwise.
    */
  
boolean isEndOfStream(T nextElement);
}

4.这里穿插说明一下flink序列化与反序列化

flink-core默认提供一个最简单的字符序列化/反序列化类(默认:utf-8

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.serialization;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Very simple serialization schema for strings.
*
*

By default, the serializer uses "UTF-8" for string/byte conversion.
*/
@PublicEvolving
public class SimpleStringSchema implements DeserializationSchema, SerializationSchema {

private static final long serialVersionUID = 1L;
  /** The charset to use to convert between strings and bytes.
   * The field is transient because we serialize a different delegate object instead */
  private transient Charset charset;
  /**
   * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
   */
  public SimpleStringSchema() {
this(StandardCharsets.UTF_8);
  }

/**
   * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
   *
   * @param charset The charset to use to convert between strings and bytes.
   */
  public SimpleStringSchema(Charset charset) {
this.charset = checkNotNull(charset);
  }

/**
   * Gets the charset used by this schema for serialization.
   * @return The charset used by this schema for serialization.
   */
  public Charset getCharset() {
return charset;
  }

// ------------------------------------------------------------------------
   Kafka Serialization
  ------------------------------------------------------------------------
  @Override
  public String deserialize(byte[] message) {
return new String(message, charset);
  }

@Override
  public boolean isEndOfStream(String nextElement) {
return false;
  }

@Override
  public byte[] serialize(String element) {
return element.getBytes(charset);
  }

@Override
  public TypeInformation getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
  }

// ------------------------------------------------------------------------
   Java Serialization
  ------------------------------------------------------------------------
  private void writeObject (ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
     out.writeUTF(charset.name());
  }

private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
     String charsetName = in.readUTF();
     this.charset = Charset.forName(charsetName);
  }
}

官方demo也有个KafkaEventSchema类实现自定义KafkaEvent(POJO)序列化/反序列化类

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
/**
* The serialization schema for the {@link KafkaEvent} type. This class defines how to transform a
* Kafka record's bytes to a {@link KafkaEvent}, and vice-versa.
*/
public class KafkaEventSchema implements DeserializationSchema, SerializationSchema {

private static final long serialVersionUID = 6154188370181669758L;
  @Override
  public byte[] serialize(KafkaEvent event) {
return event.toString().getBytes();
  }

@Override
  public KafkaEvent deserialize(byte[] message) throws IOException {
return KafkaEvent.fromString(new String(message));
  }

@Override
  public boolean isEndOfStream(KafkaEvent nextElement) {
return false;
  }

@Override
  public TypeInformation getProducedType() {
return TypeInformation.of(KafkaEvent.class);
  }
}

总结:数据源不是固定数据格式的话,直接使用SimpleStringSchema即可。否则,可以自己实现固定数据格式对应的POJO

5.Kafka
Consumers Start Position Configuration

这个不用多说,直接看demo,一目了然

final StreamExecutionEnvironmentenv = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08<String>myConsumer = new FlinkKafkaConsumer08<>(...);

myConsumer.setStartFromEarliest();     // start fromthe earliest record possible

myConsumer.setStartFromLatest();       // start fromthe latest record

myConsumer.setStartFromTimestamp(...);// start fromspecified epoch timestamp (milliseconds)

myConsumer.setStartFromGroupOffsets();// the defaultbehaviour

DataStream<String>stream = env.addSource(myConsumer);

6.Kafka
Consumers and Fault Tolerance

Flinkcheckpoint开启时,checkpoint以一致的方式将kafka offsets和其他operations状态一起保存。Job失败,Flink将存储最近一次的checkpoint,启动之后依据该checkpointkafka offsets,开始消费kafka数据。

final StreamExecutionEnvironmentenv = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);

7.Kafka
Consumers Topic and Partition Discovery

默认情况下,partitiondiscovery是关闭的。开启方法:在properties config设置非负值给flink.partition-discovery.interval-millis
,表示
discovery时间间隔(ms)

8.Kafka Consumers
Offset Committing Behaviour Configuration

Flink Kafka Consumer允许配置offsets的提交到kafka或者(zookeeperin 0.8)。这种方式提交的offsets并不用来保证容错性,仅用来暴露出consumer的消费情况。Offsets提交行为描述:未开启checkpointoffsets提交依赖于consumer的自动提交(nable.auto.commit
 (or auto.commit.enable
 for Kafka0.8) auto.commit.interval.ms
)开启checkpointoffsets提交到checkpointsetCommitOffsetsOnCheckpoints(boolean)默认是开启的。consumer的自动提交强制关闭,这一点见如下源码:

// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
// this overwrites whatever setting the user configured in the properties
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
  
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}

9.Flink subtaskkafka partition对应算法(重要的一点)

这一点特别重要,官网没说明。先看下算法实现:

* @param partition the Kafka partition
* @param numParallelSubtasks total number of parallel subtasks
*
* @return index of the target subtask that the Kafka partition should be assigned to.
*/
public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
  // here, the assumption is that the id of Kafka partitions are always ascending
  // starting from 0, and therefore can be used directly as the offset clockwise from the start index
  return (startIndex + partition.getPartition()) % numParallelSubtasks;
}

其实算法也很简单,举个例子,topic有三个partition,flinkconsumerkafka并行度为3,那么:

partition0对应flinkconsumerkafkasubtask1;

partition1对应flinkconsumerkafkasubtask2;

partition2对应flinkconsumerkafkasubtask0;

我们都知道partition跟subtask都是从0开始计数的,个人觉得完全可以改成如下算法:

return (partition.getPartition()) % numParallelSubtasks;

最合适的做法就是flinkconsumerkafka并行度与kafka partition数相同,即能保证消费效率,也不浪费资源。

10总结

至此,Flink-connector-kafka的consumer用法基本介绍完成。有疑问欢迎沟通交流。


                 欢迎关注微信公众号




推荐阅读
author-avatar
情调
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有