作者:纽约纽约MrWaNg | 来源:互联网 | 2023-07-09 18:59
接上篇:FlinkFileSink自定义输出路径——BucketingSink上篇使用BucketingSink实现了自定义输出路径,现在来看看StreamingFileSink(
接上篇:Flink FileSink 自定义输出路径——BucketingSink
上篇使用BucketingSink 实现了自定义输出路径,现在来看看 StreamingFileSink( 据说是StreamingFileSink 是社区优化后添加的connector,推荐使用)
StreamingFileSink 实现起来会稍微麻烦一点(也是灵活,功能更强大),因为可以自己实现序列化方法(源码里面有实例可以参考-复制)
StreamingFileSink 有两个方法可以输出到文件 forRowFormat 和 forBulkFormat,名字差不多代表的方法的含义:行格式和块格式
forRowFormat 比较简单,只提供了 SimpleStringEncoder 写文本文件,可以指定编码,如下:
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
val input: DataStream[String] = ...
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) // 所有数据都写到同一个路径
.build()
input.addSink(sink)
当然我们的主题还是根据输入数据自定义文件输出路径,就需要重写 DayBucketAssigner,如下:
import java.io.IOException
import java.nio.charset.StandardCharsets
import org.apache.flink.core.io.SimpleVersionedSerializer
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
class DayBucketAssigner extends BucketAssigner[ObjectNode, String] {
/**
* bucketId is the output path
* @param element
* @param context
* @return
*/
override def getBucketId(element: ObjectNode, context: BucketAssigner.Context): String = {
//context.currentProcessingTime()
val day = element.get("date").asText("19790101000000").substring(0, 8)
// wrap can use day + "/" + xxx
day
}
override def getSerializer: SimpleVersionedSerializer[String] = {
StringSerializer
}
/**
* 实现参考 : org.apache.flink.runtime.checkpoint.StringSerializer
*/
object StringSerializer extends SimpleVersionedSerializer[String] {
val VERSION = 77
override def getVersion = 77
@throws[IOException]
override def serialize(checkpointData: String): Array[Byte] = checkpointData.getBytes(StandardCharsets.UTF_8)
@throws[IOException]
override def deserialize(version: Int, serialized: Array[Byte]): String = if (version != 77) throw new IOException("version mismatch")
else new String(serialized, StandardCharsets.UTF_8)
}
}
在初始化sink 的时候,指定 BucketAssigner 就可以了
val sinkRow = StreamingFileSink
.forRowFormat(new Path("D:\\idea_out\\rollfilesink"), new SimpleStringEncoder[ObjectNode]("UTF-8"))
.withBucketAssigner(new DayBucketAssigner)
// .withBucketCheckInterval(60 * 60 * 1000l) // 1 hour
.build()
执行结果如下:
2、 forBulkFormat 和forRowFormat 不太一样,需要自己实现 BulkWriterFactory 和 DayBulkWriter,自定义程度高,可以实现自己的 FSDataOutputStream,写出各种格式的文件(forRowFormat 自定义Encoder 也可以,但是如 forBuckFormat 灵活)
// use define BulkWriterFactory and DayBucketAssinger
val sinkBuck = StreamingFileSink
.forBulkFormat(new Path("D:\\idea_out\\rollfilesink"), new DayBulkWriterFactory)
.withBucketAssigner(new DayBucketAssigner())
.build()
实现如下:
import java.io.File
import java.nio.charset.StandardCharsets
import org.apache.flink.api.common.serialization.BulkWriter
import org.apache.flink.core.fs.FSDataOutputStream
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.util.Preconditions
/**
* 实现参考 : org.apache.flink.streaming.api.functions.sink.filesystem.BulkWriterTest
*/
class DayBulkWriter extends BulkWriter[ObjectNode] {
val charset = StandardCharsets.UTF_8
var stream: FSDataOutputStream = _
def DayBulkWriter(inputStream: FSDataOutputStream): DayBulkWriter = {
stream = Preconditions.checkNotNull(inputStream);
this
}
/**
* write element
*
* @param element
*/
override def addElement(element: ObjectNode): Unit = {
this.stream.write(element.toString.getBytes(charset))
// wrapthis.stream.write(‘\n‘)
}
override def flush(): Unit = {
this.stream.flush()
}
/**
* output stream is input parameter, just flush, close is factory‘s job
*/
override def finish(): Unit = {
this.flush()
}
}
/**
* 实现参考 : org.apache.flink.streaming.api.functions.sink.filesystem.BulkWriterTest.TestBulkWriterFactory
*/
class DayBulkWriterFactory extends BulkWriter.Factory[ObjectNode] {
override def create(out: FSDataOutputStream): BulkWriter[ObjectNode] = {
val dayBulkWriter = new DayBulkWriter
dayBulkWriter.DayBulkWriter(out)
}
}
执行的结果就不赘述了
又遇到个问题,StreamFileSink 没办法指定输出文件的名字。