热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

自定义DateTimeBucket

转载自:https:blog.csdn.netu010259977articledetails88683503Flink的StreamingFileSink自定义D

转载自:https://blog.csdn.net/u010259977/article/details/88683503

Flink的StreamingFileSink自定义DateTimeBucket

 

用flink消费kafka内容,通过清洗、转换、过滤后,要sink到parquet文件,需要按照事件的event进行分区生产需要写入的文件夹,如event1的发生时间在2018-03-19,而event2的发生时间在2018-03-20,这就涉及到extract它的eventtime,并生产parquet文件的bucktId,具体代码如下:

 


  1. /*

  2. * Licensed to the Apache Software Foundation (ASF) under one

  3. * or more contributor license agreements. See the NOTICE file

  4. * distributed with this work for additional information

  5. * regarding copyright ownership. The ASF licenses this file

  6. * to you under the Apache License, Version 2.0 (the

  7. * "License"); you may not use this file except in compliance

  8. * with the License. You may obtain a copy of the License at

  9. *

  10. * http://www.apache.org/licenses/LICENSE-2.0

  11. *

  12. * Unless required by applicable law or agreed to in writing, software

  13. * distributed under the License is distributed on an "AS IS" BASIS,

  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

  15. * See the License for the specific language governing permissions and

  16. * limitations under the License.

  17. */

  18.  
  19. package com.hellobike.realtimeplatform.utils;

  20.  
  21. import com.hellobike.realtimeplatform.model.AccessLog;

  22. import org.apache.flink.annotation.PublicEvolving;

  23. import org.apache.flink.core.io.SimpleVersionedSerializer;

  24. import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;

  25. import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;

  26. import org.apache.flink.util.Preconditions;

  27.  
  28. import java.text.ParseException;

  29. import java.text.SimpleDateFormat;

  30. import java.time.Instant;

  31. import java.time.ZoneId;

  32. import java.time.format.DateTimeFormatter;

  33. import java.util.Calendar;

  34.  
  35. /**

  36. * A {@link BucketAssigner} that assigns to buckets based on current system time.

  37. *

  38. *

  39. *

    The {@code DateTimeBucketer} will create directories of the following form:

  40. * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path

  41. * that was specified as a base path when creating the

  42. * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.

  43. * The {@code dateTimePath} is determined based on the current system time and the

  44. * user provided format string.

  45. *

  46. *

  47. *

    {@link DateTimeFormatter} is used to derive a date string from the current system time and

  48. * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling

  49. * files will have a granularity of hours.

  50. *

  51. *

    Example:

  52. *

  53. *

    {@code

  54. * BucketAssigner bucketAssigner = new DateTimeBucketAssigner("yyyy-MM-dd--HH");

  55. * }

  56. *

  57. *

    This will create for example the following bucket path:

  58. * {@code /base/1976-12-31-14/}

  59. */

  60. @PublicEvolving

  61. public class DateTimeBucketWithPartitionAssigner implements BucketAssigner {

  62.  
  63. private static final long serialVersionUID = 1L;

  64.  
  65. private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";

  66.  
  67. private final String formatString;

  68.  
  69. private final ZoneId zoneId;

  70.  
  71. private transient DateTimeFormatter dateTimeFormatter;

  72.  
  73. /**

  74. * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.

  75. */

  76. public DateTimeBucketWithPartitionAssigner() {

  77. this(DEFAULT_FORMAT_STRING);

  78. }

  79.  
  80. /**

  81. * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.

  82. *

  83. * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine

  84. * the bucket id.

  85. */

  86. public DateTimeBucketWithPartitionAssigner(String formatString) {

  87. this(formatString, ZoneId.systemDefault());

  88. }

  89.  
  90. /**

  91. * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.

  92. *

  93. * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.

  94. */

  95. public DateTimeBucketWithPartitionAssigner(ZoneId zoneId) {

  96. this(DEFAULT_FORMAT_STRING, zoneId);

  97. }

  98.  
  99. /**

  100. * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.

  101. *

  102. * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine

  103. * the bucket path.

  104. * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.

  105. */

  106. public DateTimeBucketWithPartitionAssigner(String formatString, ZoneId zoneId) {

  107. this.formatString = Preconditions.checkNotNull(formatString);

  108. this.zoneId = Preconditions.checkNotNull(zoneId);

  109. }

  110.  
  111. @Override

  112. public String getBucketId(IN element, Context context) {

  113. long eventTime = 0L;

  114. if (dateTimeFormatter == null) {

  115. dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);

  116. }

  117. if (element instanceof AccessLog) {

  118. if (-1 == ((AccessLog) element).getParseResult() && "".equals(((AccessLog) element).getEventTimestamp())) {

  119. return "pt=errorTime";

  120. }

  121.  
  122. eventTime = Long.valueOf(((AccessLog) element).getEventTimestamp());

  123.  
  124. return "pt=" + dateTimeFormatter.format(Instant.ofEpochMilli(eventTime));

  125. } else {

  126. return "pt=errorObjects";

  127. }

  128. }

  129.  
  130.  
  131. @Override

  132. public SimpleVersionedSerializer getSerializer() {

  133. return SimpleVersionedStringSerializer.INSTANCE;

  134. }

  135.  
  136. @Override

  137. public String toString() {

  138. return "DateTimeBucketAssigner{" +

  139. "formatString='" + formatString + '\'' +

  140. ", zoneId=" + zoneId +

  141. '}';

  142. }

  143. }

指定自定义的DateTime Assigner就可以实现基于event time分桶写入parquet文件

 


推荐阅读
  • 本文详细介绍了如何在 Ubuntu 14.04 系统上搭建仅使用 CPU 的 Caffe 深度学习框架,包括环境准备、依赖安装及编译过程。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • Spring Security基础配置详解
    本文详细介绍了Spring Security的基础配置方法,包括如何搭建Maven多模块工程以及具体的安全配置步骤,帮助开发者更好地理解和应用这一强大的安全框架。 ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 基于Web的Kafka管理工具Kafkamanager首次访问Web界面的详细配置指南(附图解)
    首次访问Kafkamanager Web界面时,需要对Kafka集群进行配置。这一过程相对简单,用户只需依次点击【Cluster】>【Add Cluster】,按照提示完成相关设置即可。本文将通过图文并茂的方式,详细介绍每一步的配置步骤,帮助用户快速上手Kafkamanager。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Kafka 是由 Apache 软件基金会开发的高性能分布式消息系统,支持高吞吐量的发布和订阅功能,主要使用 Scala 和 Java 编写。本文将深入解析 Kafka 的安装与配置过程,为程序员提供详尽的操作指南,涵盖从环境准备到集群搭建的每一个关键步骤。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 在Linux系统中,原本已安装了多个版本的Python 2,并且还安装了Anaconda,其中包含了Python 3。本文详细介绍了如何通过配置环境变量,使系统默认使用指定版本的Python,以便在不同版本之间轻松切换。此外,文章还提供了具体的实践步骤和注意事项,帮助用户高效地管理和使用不同版本的Python环境。 ... [详细]
  • Storm集成Kakfa
    一、整合说明Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:StormKafkaIntegratio ... [详细]
  • 以Flink为例,消除流处理常见的六大谬见
    以Flink为例,消除流处理常见的六大谬见 ... [详细]
  • 两种方式实现Flink异步IO查询Mysql
    如官网所描述的Flink支持两种方式实现异步IO查询外部系统http ... [详细]
  • 本文详细介绍了如何在最新版本的Xcode中重命名iOS项目,包括项目名称、应用名称及相关的文件夹和配置文件。通过本文,开发者可以轻松完成项目的重命名工作。 ... [详细]
  • Flink1.10定义UDAGG遇到SQL
    按照以下代码测试定义的UDAGG会一直出现org.apache.flink.table.api.ValidationException:SQLvalidationfailed.nu ... [详细]
author-avatar
风清淡雅的梦
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有