热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

快速入门流处理框架Flink实时报表场景的应用

随着业务的发展,数据量剧增,我们一些简单报表大盘类的任务,就不能简单的依赖于RDBMS了,而是依赖于数仓之类的大数据平台。数仓有着巨量数据的存储能力,但是一般都存在一定数据延迟,

  随着业务的发展,数据量剧增,我们一些简单报表大盘类的任务,就不能简单的依赖于RDBMS了,而是依赖于数仓之类的大数据平台。

  数仓有着巨量数据的存储能力,但是一般都存在一定数据延迟,所以要想完全依赖数数仓来解决实时报表问题,是困难的。

  其实,所谓的实时报表,往简单了说就是: 对现在的一些数据进行加减乘除聚合后,得到的一串与时间相关的数字。

  所以,这类问题的关键点应该在于这个实时数据怎么来,以及怎么处理这些实时数据。

 

  一般地,做这类报表类工作,最基本的原则就是: 业务无侵入性,然后又要做到实时。

 

  所以,本能性地想到,使用消息中间件来解耦这个数据就好了,Kafka 可能是个比较好的选择。当然,这个前提是业务技术都是使用这一套东西的,如果没有,则可能想另外的招了,比如: binlog 解析?

 

  有了数据来源之后,我们就可以做相应的报表数据了。

  前面既然提到,报表基本上就是进行简单的加减乘除,那就是很简单了呗。

  也就是,自己起几个kafka消费者,然后消费数据,运算后,得到结果,然后存入DB中,而已。

  所以,完全可以去做这么一件事。但是你知道,凡事不会那么简单,你要处理多少异常:时间边界问题,宕机问题,业务新增问题。。。

  

  不多说了,回到本文正题:像这类场景,其实就是简单的流处理流计算而已,早已相应的开发模块被提炼出来,咱们只要学会使用就好了。

  Flink是其中做得比较好的一个框架,据说也是未来的一个趋势。既然如此,何不学他一学。

 

  Flink,流计算,感觉挺难啊!

  其实不然,就像前面我们提到解决方案一样,入门就是这么简单。

 

  好,接下来我们通过一个 flink-demo,试着入门一下!


解释:
  1. 以下demo的应用场景是: 统计1分钟类的渠道下单数量;
  2. 数据源源为kakfa;
  3. 数据输出存储为kafka和控制台;

 

真实的代码如下:

package com.my.flink.kafka.consumer;

import com.my.flink.config.KafkaConstantProperties;
import com.my.flink.kafka.serializer.KafkaTuple4StringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * 用java 写消费者
 *
 */
public class ConsumeKafkaByJava {

    private static final String CONSUMER_GROUP_ID = "test.flink.consumer1";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.enableCheckpointing(1000);

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", KafkaConstantProperties.KAFKA_BROKER);
        kafkaProps.setProperty("group.id", CONSUMER_GROUP_ID);

        FlinkKafkaConsumer010 myCOnsumer= new FlinkKafkaConsumer010(
                KafkaConstantProperties.FLINK_COMPUTE_TOPIC_IN1,
                new SimpleStringSchema(),
                kafkaProps);

        DataStream dataStream = env.addSource(myConsumer);
        // 四元组数据为: 订单号,统计维度标识,订单数,订单金额
        DataStream> counts = dataStream
                .flatMap(new TestBizDataLineSplitter())
                .keyBy(1)
                .timeWindow(Time.of(30, TimeUnit.SECONDS))
                .reduce((value1, value2) -> {
                    return new Tuple4<>(value1.f0, value1.f1, value1.f2 + value2.f2, value1.f3 + value2.f3);
                });
        
        // 暂时输入与输出相同
        counts.addSink(new FlinkKafkaProducer010<>(
                KafkaConstantProperties.FLINK_DATA_SINK_TOPIC_OUT1,
                new KafkaTuple4StringSchema(),
                kafkaProps)
        );
        // 统计值多向输出
        dataStream.print();
        counts.print();
        env.execute("Test Count from Kafka data");
    }

}

  如上,就是一个 flink 的统计代码了,简单不?肯定简单!

  不过,单这个东西肯定是跑不起来的,我们还需要框架基础依赖附加模板工作,不过这些真的只是 copy 而已哦。

1. pom.xml 依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0modelVersion>
  <groupId>com.my.flinkgroupId>
  <artifactId>flink-kafka-testartifactId>
  <version>1.0-SNAPSHOTversion>
  <inceptionYear>2008inceptionYear>
  <properties>
    <scala.version>2.11.6scala.version>
  properties>

  <repositories>
    <repository>
      <id>scala-tools.orgid>
      <name>Scala-Tools Maven2 Repositoryname>
      <url>http://scala-tools.org/repo-releasesurl>
    repository>
  repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.orgid>
      <name>Scala-Tools Maven2 Repositoryname>
      <url>http://scala-tools.org/repo-releasesurl>
    pluginRepository>
  pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-langgroupId>
      <artifactId>scala-libraryartifactId>
      <version>${scala.version}version>
    dependency>
    <dependency>
      <groupId>junitgroupId>
      <artifactId>junitartifactId>
      <version>4.4version>
      <scope>testscope>
    dependency>
    <dependency>
      <groupId>org.specsgroupId>
      <artifactId>specsartifactId>
      <version>1.2.5version>
      <scope>testscope>
    dependency>

    <dependency>
      <groupId>org.apache.flinkgroupId>
      <artifactId>flink-coreartifactId>
      <version>1.3.2version>
      <scope>compilescope>
    dependency>

    
    <dependency>
      <groupId>org.apache.flinkgroupId>
      <artifactId>flink-connector-kafka-0.10_2.11artifactId>
      <version>1.3.2version>
      <scope>compilescope>
    dependency>

    
    <dependency>
      <groupId>org.apache.kafkagroupId>
      <artifactId>kafka_2.11artifactId>
      <version>0.10.2.0version>
      <scope>compilescope>
    dependency>

    <dependency>
      <groupId>org.apache.kafkagroupId>
      <artifactId>kafka-clientsartifactId>
      <version>2.3.0version>
      <scope>compilescope>
    dependency>

    
    <dependency>
      <groupId>org.apache.flinkgroupId>
      <artifactId>flink-streaming-java_2.11artifactId>
      <version>1.3.2version>
      <scope>compilescope>
    dependency>



    <dependency>
      <groupId>com.alibabagroupId>
      <artifactId>fastjsonartifactId>
      <version>1.2.59version>
    dependency>

  dependencies>

  <build>
    <sourceDirectory>src/main/scalasourceDirectory>
    <testSourceDirectory>src/test/scalatestSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.pluginsgroupId>
        <artifactId>maven-compiler-pluginartifactId>
        <configuration>
          <source>1.8source>
          <target>1.8target>
        configuration>
      plugin>
      <plugin>
        <groupId>org.apache.maven.pluginsgroupId>
        <artifactId>maven-jar-pluginartifactId>
        <configuration>
          <archive>
            <manifest>
              <addClasspath>trueaddClasspath>
              <useUniqueVersions>falseuseUniqueVersions>
              <classpathPrefix>lib/classpathPrefix>
              <mainClass>com.my.flink.kafka.consumer.ConsumeKafkaByJavamainClass>
            manifest>
          archive>
        configuration>
      plugin>
      <plugin>
        <groupId>org.scala-toolsgroupId>
        <artifactId>maven-scala-pluginartifactId>
        <executions>
          <execution>
            <goals>
              <goal>compilegoal>
              <goal>testCompilegoal>
            goals>
          execution>
        executions>
        <configuration>
          <scalaVersion>${scala.version}scalaVersion>
          <args>
            <arg>-target:jvm-1.5arg>
          args>
        configuration>
      plugin>
      <plugin>
        <groupId>org.apache.maven.pluginsgroupId>
        <artifactId>maven-eclipse-pluginartifactId>
        <configuration>
          <downloadSources>truedownloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilderbuildcommand>
          buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanatureprojectnature>
          additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINERclasspathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINERclasspathContainer>
          classpathContainers>
        configuration>
      plugin>
      <plugin>
        <groupId>org.apache.maven.pluginsgroupId>
        <artifactId>maven-assembly-pluginartifactId>
        <version>2.4.1version>
        <configuration>
          
          <descriptorRefs>
            <descriptorRef>jar-with-dependenciesdescriptorRef>
          descriptorRefs>
          
          <archive>
            <manifest>
              <mainClass>com.my.flink.kafka.consumer.ConsumeKafkaByJavamainClass>
            manifest>
          archive>

        configuration>
        <executions>
          <execution>
            <id>make-assemblyid>
            
            <phase>packagephase>
            <goals>
              <goal>singlegoal>
            goals>
          execution>
        executions>
      plugin>

    plugins>
  build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-toolsgroupId>
        <artifactId>maven-scala-pluginartifactId>
        <configuration>
          <scalaVersion>${scala.version}scalaVersion>
        configuration>
      plugin>
    plugins>
  reporting>
project>

 

2. 另外再加几个辅助类:

// 1. 
package com.my.flink.config;

/**
 * kafka 相关常量定义
 *
 */
public class KafkaConstantProperties {

    /**
     * kafka broker 地址
     */
    public static final String KAFKA_BROKER = "127.0.0.1:9092";

    /**
     * zk 地址,低版本 kafka 使用,高版本已丢弃
     */
    public static final String ZOOKEEPER_HOST = "master:2181,slave1:2181,slave2:2181";

    /**
     * flink 计算使用topic 1
     */
    public static final String FLINK_COMPUTE_TOPIC_IN1 = "mastertest";

    /**
     * flink消费结果,输出到kafka, topic 数据
     */
    public static final String FLINK_DATA_SINK_TOPIC_OUT1 = "flink_compute_result_out1";

}

// 2. 
package com.my.flink.kafka.formatter;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;

/**
 * 原始消息参数处理类
 *
 */

public final class TestBizDataLineSplitter implements FlatMapFunction<String,
                                    Tuple4> {

    private static final long serialVersiOnUID= 1L;

    /**
     * 进行 map 阶段展开操作
     *
     * @param value 原始值: bizData: 2019-08-01 17:39:32,
     *              P0001,channel1,201908010116100001,100
     *
     *              dateTimeMin,
     *              productCode, channel,
     *              orderId, money
     *              [, totalCount, totalMoney]
     *
     * @param out 输出值, 用四元组保存
     *
     */
    @Override
    public void flatMap(String value, CollectorString, String,
                                                        Integer, Double>> out) {
        String[] tokens = value.split(",");
        String time = tokens[0].substring(0, 16);
        String uniqDimKey = time + "," + tokens[1] + "," + tokens[2];
        // totalCount: 1, totalPremium: premium
        // todo: 写成 pojo

        out.collect(new Tuple4<>(tokens[3], uniqDimKey, 1, Double.valueOf(tokens[4])));
    }

}

// 3. 
package com.my.flink.kafka.serializer;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * kafka 自定义序列化器
 */
public class KafkaTuple4StringSchema  implements DeserializationSchema>, SerializationSchema> {

    private static final long serialVersiOnUID= -5784600791822349178L;

    // ------------------------------------------------------------------------
    //  Kafka Serialization
    // ------------------------------------------------------------------------

    /** The charset to use to convert between strings and bytes.
     * The field is transient because we serialize a different delegate object instead */
    private transient Charset charset;

    private String separator = ",";

    /**
     * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
     */
    public KafkaTuple4StringSchema() {
        this(StandardCharsets.UTF_8);
    }

    /**
     * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
     *
     * @param charset The charset to use to convert between strings and bytes.
     */
    public KafkaTuple4StringSchema(Charset charset) {
        this.charset = checkNotNull(charset);
    }

    @Override
    public Tuple4 deserialize(byte[] message) {
        String rawData = new String(message, StandardCharsets.UTF_8);
        String[] dataArr = rawData.split(separator);
        return new Tuple4<>(dataArr[0], dataArr[1],
                            Integer.valueOf(dataArr[2]), Double.valueOf(dataArr[3]));
    }

    @Override
    public boolean isEndOfStream(Tuple4 nextElement) {
        return false;
    }

    @Override
    public byte[] serialize(Tuple4 element) {
        return (element.f0 + separator +
                element.f1 + separator +
                element.f2 + separator +
                element.f3).getBytes();
    }

    @Override
    public TypeInformation> getProducedType() {
        return null;
    }

}

 

  这样,加上上面的 demo, 其实就可以跑起来了。

 

下面我们从demo里看看 flink 的开发套路:

        // 1. 获取运行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 配置接入数据源
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", KafkaConstantProperties.KAFKA_BROKER);
        kafkaProps.setProperty("group.id", CONSUMER_GROUP_ID);
        FlinkKafkaConsumer010 myCOnsumer= new FlinkKafkaConsumer010(
                KafkaConstantProperties.FLINK_COMPUTE_TOPIC_IN1,
                new SimpleStringSchema(),
                kafkaProps);
        DataStream dataStream = env.addSource(myConsumer);
        
        // 3. 处理数据
        DataStream> counts = dataStream
                .flatMap(new ProposalBizDataLineSplitter())
                .keyBy(1)
                .timeWindow(Time.of(30, TimeUnit.SECONDS))
                .reduce((value1, value2) -> {
                    return new Tuple4<>(value1.f0, value1.f1, value1.f2 + value2.f2, value1.f3 + value2.f3);
                });

        // 4. 输出处理结果
        counts.addSink(new FlinkKafkaProducer010<>(
                KafkaConstantProperties.FLINK_DATA_SINK_TOPIC_OUT1,
                new KafkaTuple4StringSchema(),
                kafkaProps)
        );
        // 统计值多向输出
        dataStream.print();
        counts.print();
        
        // 5. 正式提交运行
        env.execute("Test Count from Kafka data");
    

  其实就5个步骤,而且自己稍微想想,除了第5个步骤外,这些也都是必须的东西,再无多余了。
    1. 获取运行环境
    2. 配置接入数据源
    3. 处理数据
    4. 输出处理结果
    5. 正式提交运行

  所以,你觉得复杂吗?除了那些模板?(模板从来都是复制)

  所以,我们可以随意使用这些框架来帮我们处理事务吗?

  你还得看下公司的环境:比如 资金支持、运维支持、框架支持?

  总之,入门很简单,但不要以为真简单!(保持敬畏之心)

接下来,我们来看一下关于Flink的一些架构问题:

和大多数的大数据处理框架一样,Flink也是一种 master-slave 架构;如图:

 

  简单点说就是,flink 是一套自管理的运行环境,你只需按照flink范式编写代码,提交到集群运行即可。

Flink 抽象层级:

 

Flink 的重要特性:

  支持高吞吐、低延迟、高性能的流处理
  支持带有事件时间的窗口操作
  支持有状态计算的Exactly-once语义
  支持高度灵活的窗口操作,支持基于time、count、session,以及data-driven的窗口操作
  支持具有Backpressure功能的持续流模型
  支持基于轻量级分布式快照实现的容错
  支持批流合一处理
  Flink在JVM内部实现了自己的内存管理
  支持迭代计算
  支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
  支持Table-API的操作
  支持SQL式友好开发

 

重要概念解释:


  Watermark: 是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性;包含: eventTime / IngestionTime / processTime
  DataStream 流处理, DataSet 批处理;
  Window: TumblingWindow / SlidingWindow / SessionWindow / CountWindow
  Map: 一对一映射数据流,flatMap: 一对N数据流映射;
  Filter: 过滤返回false的数据,keyBy: 将相同key的DataStream分配到同一分区以便进行聚合计算, reduce: 将数据合并为一个新的数据;
  Sink: 输出,RichSinkFunction 实现自定义输出;基于文件的:如 writeAsText()、writeAsCsv()、writeUsingOutputFormat、FileOutputFormat。 写到socket: writeToSocket。 用于显示的:print、printToErr。 自定义Sink: addSink。connectors 用于给接入第三方数据提供接口,现在支持的connectors 包括:Apache Kafka/Apache Cassandra/Elasticsearch/Hadoop FileSystem/RabbitMQ/Apache NiFi
  SnapShot:由于 Flink 的 checkpoint 是通过分布式快照实现的,接下来我们将 snapshot 和 checkpoint 这两个词交替使用。由于 Flink checkpoint 是通过分布式 snapshot 实现的,snapshot 和 checkpoint 可以互换使用。
  Backpressure: 反压通常产生于这样的场景:短时负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或者遇到大促或秒杀活动导致流量陡增。反压如果不能得到正确的处理,可能会导致资源耗尽甚至系统崩溃。

 

唠叨: 方向。


推荐阅读
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Go GUIlxn/walk 学习3.菜单栏和工具栏的具体实现
    本文介绍了使用Go语言的GUI库lxn/walk实现菜单栏和工具栏的具体方法,包括消息窗口的产生、文件放置动作响应和提示框的应用。部分代码来自上一篇博客和lxn/walk官方示例。文章提供了学习GUI开发的实际案例和代码示例。 ... [详细]
  • Spring学习(4):Spring管理对象之间的关联关系
    本文是关于Spring学习的第四篇文章,讲述了Spring框架中管理对象之间的关联关系。文章介绍了MessageService类和MessagePrinter类的实现,并解释了它们之间的关联关系。通过学习本文,读者可以了解Spring框架中对象之间的关联关系的概念和实现方式。 ... [详细]
  • (三)多表代码生成的实现方法
    本文介绍了一种实现多表代码生成的方法,使用了java代码和org.jeecg框架中的相关类和接口。通过设置主表配置,可以生成父子表的数据模型。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 第四章高阶函数(参数传递、高阶函数、lambda表达式)(python进阶)的讲解和应用
    本文主要讲解了第四章高阶函数(参数传递、高阶函数、lambda表达式)的相关知识,包括函数参数传递机制和赋值机制、引用传递的概念和应用、默认参数的定义和使用等内容。同时介绍了高阶函数和lambda表达式的概念,并给出了一些实例代码进行演示。对于想要进一步提升python编程能力的读者来说,本文将是一个不错的学习资料。 ... [详细]
  • Java自带的观察者模式及实现方法详解
    本文介绍了Java自带的观察者模式,包括Observer和Observable对象的定义和使用方法。通过添加观察者和设置内部标志位,当被观察者中的事件发生变化时,通知观察者对象并执行相应的操作。实现观察者模式非常简单,只需继承Observable类和实现Observer接口即可。详情请参考Java官方api文档。 ... [详细]
  • 数组的排序:数组本身有Arrays类中的sort()方法,这里写几种常见的排序方法。(1)冒泡排序法publicstaticvoidmain(String[]args ... [详细]
  • 本文介绍了利用ARMA模型对平稳非白噪声序列进行建模的步骤及代码实现。首先对观察值序列进行样本自相关系数和样本偏自相关系数的计算,然后根据这些系数的性质选择适当的ARMA模型进行拟合,并估计模型中的位置参数。接着进行模型的有效性检验,如果不通过则重新选择模型再拟合,如果通过则进行模型优化。最后利用拟合模型预测序列的未来走势。文章还介绍了绘制时序图、平稳性检验、白噪声检验、确定ARMA阶数和预测未来走势的代码实现。 ... [详细]
  • java.lang.Class.getDeclaredMethod()方法java.lang.Class.getDeclaredMethod()方法用法实例教程-方法返回一个Met ... [详细]
author-avatar
liqiqinai
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有