热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkDataStreamTransform(三)

FlinkDataStreamTransform(三)环境变量      import org.apache.flink.api.scala.ExecutionEnvironmen

Flink DataStream Transform(三)


环境变量

      import org.apache.flink.api.scala.ExecutionEnvironment
      val env = ExecutionEnvironment.getExecutionEnvironment;//批处理运行上下文环境
      import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
      val streamenv = StreamExecutionEnvironment.getExecutionEnvironment//流处理运行上下文环境

map filter flatMap

      import org.apache.flink.streaming.api.scala.createTypeInformation
      streamenv.fromCollection(Array("1,2","3,4","5,6"))
          .flatMap(x=>x.split(",")).map(x=>(x,1)).filter(x=>x._2.toInt>=1).print()

keyby min max

      //样例数据key都是一样的,注意结果打印
      streamenv.fromCollection(Array((1,3),(1,2),(1,1))).keyBy(x=>x._1).sum(1).print()
      streamenv.fromCollection(Array((1,3),(1,2),(1,1))).keyBy(x=>x._1).min(1).print()
      streamenv.fromCollection(Array((1,1),(1,2),(1,3))).keyBy(x=>x._1).max(1).print()
      streamenv.fromCollection(Array((1,1),(1,2),(1,3))).keyBy(x=>x._1).minBy(1).print()
      streamenv.fromCollection(Array((1,3),(1,2),(1,1))).keyBy(x=>x._1).maxBy(1).print()
      streamenv.fromCollection(Array((1,(3,2)),(1,(6,5)),(2,(4,7)))).keyBy(x=>x._1)
          .reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2))).print()
      val data1 = streamenv.fromCollection(Array((1,1),(1,2),(1,3)))
      val data2 = streamenv.fromCollection(Array((2,1),(2,2),(2,3)))
      data2.union(data1).print()

connect map flatMap

    val data1 = streamenv.fromCollection(Array((1),(2),(3)))
    val data2 = streamenv.fromCollection(Array((2,1),(2,2),(2,3)))
    data1.connect(data2).map((x=>x+1),(x=>(x._1+1,x._2))).print()
    data1.connect(data2).flatMap((x=>Array(x)),(x=>Array(x._1,x._2))).print()

分区

    //随机分配
    streamenv.fromCollection(Array((1),(2),(3))).shuffle.print()    
    //对输入流进行重分区
    streamenv.fromCollection(Array((1),(2),(3))).rebalance.print()  
    //接收方任务的数量是发送方任务数量的倍数或者相反  rescale转换更有效
    streamenv.fromCollection(Array((1),(2),(3))).rescale.print()    
    rebalance()会在所有发送任务与所有接收任务之间创建通信通道,而rescale()则只创建从每个任务到下游操作的某些任务的通道。
    streamenv.fromCollection(Array((1),(2),(3))).broadcast.print()  //将数据广播给下游
    streamenv.fromCollection(Array((1),(2),(3))).global.print()     //并行度会变成1

自定义分区

    import org.apache.flink.api.common.functions.Partitioner
    class MyPartitioner  extends Partitioner[Int]{//自定义分区
      override def partition(word: Int, num: Int): Int = {
        if(word>=20 else 1
      }
    }
    streamenv.fromCollection(Array((1),(2),(3))).partitionCustom(new MyPartitioner,x => x ).print()

并行度

      import org.apache.flink.streaming.api.scala.createTypeInformation
      streamenv.setParallelism(1)//设置全局的并行度
      streamenv.fromCollection(Array((1),(2),(3))).setParallelism(2)//每一个算子  都可以有并行度

    MapFunction:实现自定义的map函数,就是map中的表达式可以抽成一个class。个人感觉还不如直接写表达式
    FilterFunction:实现自定义的filter函数
    RichFunction:普通函数没有办法在执行之前进行变量初始化。RichFunction可以(上一章其实有样例的,后面也会有)
    ProcessFunction:功能最强大的接口,可以获取上下文环境等信息(先记一下,后续详细看,加强版本的richfunction)
          ProcessFunctionKeyedProcessFunctionCoProcessFunction
          ProcessJoinFunctionBroadcastProcessFunctionKeyedBroadcastProcessFunction
          ProcessWindowFunctionProcessAllWindowFunction

水位线和watermark

数据是源源不断的,处理数据的时候只能一次处理一段时间的数据。
这时候就需要一个字段告诉程序当前时间,数据会有延迟,所以就需要在时间基础上延迟。
这就是水位线和watermark的简单理解


    //水位线:指示时间进展的标记延迟一段时间关闭    
    //周期性的产生标记/比较之前的标记/整体延迟等待 当前是10s记录的是0
    //任务中比较之前的标记(以最慢的为主)     水位线是由上游任务广播到下游任务的
    import org.apache.flink.streaming.api.scala.createTypeInformation
    val data = streamenv.fromCollection(Array((1,1000),(2,2001),(3,9002))).map(x=>(x._1,x._2,1)).keyBy(x=>x._1)
    import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
    import org.apache.flink.util.Collector
    import java.time.Duration
    import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
    streamenv.getConfig.setAutoWatermarkInterval(100)//设置小的延迟处理小大批量的数据
    //有序watermark生成
    data.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(
      new SerializableTimestampAssigner[Tuple3[Int,Int,Int]]{
      override def extractTimestamp(t: Tuple3[Int,Int,Int], l: Long): Long = t._2.toLong//获取时间字段位置
    }))
    //乱序程度可以确定的
    data.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2))
      .withTimestampAssigner(new SerializableTimestampAssigner[Tuple3[Int,Int,Int]](){
      override def extractTimestamp(t: Tuple3[Int,Int,Int], l: Long): Long = t._2.toLong//获取时间字段位置
    }))

ProcessFunction 处理 侧输出流处理

数据延迟有水位线处理,但更延迟的数据我想单独处理怎么办?侧输出流


    import org.apache.flink.streaming.api.scala.OutputTag
    val latetag = new OutputTag[String]("late")
    class SplitProcessFunction(latetag:OutputTag[String]) extends ProcessFunction[String,String{
      override def processElement(input: String, ctx: ProcessFunction[StringString]#Context, 
            out: Collector[String]): Unit = {
        if(input.toLong>1000){
          out.collect(input)
        }else{
          ctx.output(latetag,input)
        }
      }
    }
    val data = streamenv.fromCollection(Array((1,1000),(2,2001),(3,9002))).map(x=>(x._1,x._2,1)).keyBy(x=>x._1)
    val success = data.map(x=>(x._2.toString)).process(new SplitProcessFunction(latetag))
    val fail = success.getSideOutput(latetag).print()

状态

数据源源不断过来,这时候需要暂存一些中间数据,flink提供了一些数据结构,这就是状态


    //    flink中的状态   算子状态(算子状态的作用范围限定为算子任务)   
    //    键控状态(keyby 以后根据输入数据流中定义的key来维护和访问)
    //    算子状态(liststate、unionliststate、broadcaststate)
    //    键控状态(对每一个key都保存了一份状态,相同key可以访问状态)
    //   (valuestate、liststate、mapstate、reducestate&Aggregatingstate)
    import org.apache.flink.api.common.state.MapState//键控状态 样例
    import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, 
         ValueStateDescriptor,ValueState,MapState,MapStateDescriptor,ReducingState}
    import org.apache.flink.api.common.functions.{AggregateFunction, RichFlatMapFunction, RichMapFunction}
    class MyMapper extends RichMapFunction[String,String{
      var valuestate:ValueState[String] = _
      var liststate:ListState[String] = _
      var mapstate:MapState[String,String] = _
      var reducestate:ReducingState[String] = _//reducestate = getRuntimeContext.getReducingState(
          new ReducingStateDescriptor[String]("reducestate",,classOf[String]))
      override def open(parameters: Configuration): Unit ={
        valuestate = getRuntimeContext.getState(new ValueStateDescriptor[String]("valuestate",classOf[String]))
        liststate = getRuntimeContext.getListState(new ListStateDescriptor[String]("liststate",classOf[String]))
        mapstate = getRuntimeContext.getMapState(new MapStateDescriptor[String,String]
             ("mapstate",classOf[String],classOf[String]))
      }
      override def map(inString): String = {
        valuestate.value().substring(0,1)
        valuestate.update("1")
        liststate.add("1")
        liststate.get()//获取到列表
        liststate.update(null)
        mapstate.put("1","1")
        mapstate.remove("1")
        reducestate.get()
        reducestate.add("1")
        "1"
      }
    }

状态后端

上面的状态读写数据以后是需要存储的,可以存储在内存外部系统等一些存储系统中。
这就涉及到了一些存储系统和存储策略。这就是状态后端



状态后端:状态的存储、序列化、checkpoint

    MemoryStateBackend:内存级别的状态后端,存储再taskManager的JVM堆上,
        checkpoint存储在jobmanager的内存中,读取写入比较快,但是不稳定
    FsStateBackend:存储再taskManager的JVM堆上,checkpoint存储在jobmanager的文件系统中,内存状态的限制
    RocksDBStateBackend:所有状态序列化后,存储再本地的DB中

checkpoint一致性检查

    //保存所有任务处理完相同输入数据的时候   kafka下标6执行完成以后  都保存这个状态
    //检查点和数据处理分离开  不暂定整个应用(再某一个数据后面插入一个标记位   再这个数据处理完成以后进行checkpoint)
    //jobmanager 通知所有source进行检查点的保存,source进行check保存以后,广播告诉下游checkpointid及下标
    //聚合任务收到聚合数据下标(等到数据下标过来的时候等到其他聚合任务收到下标数据的时候共同保存) 
    //这时候来的其他数据缓存   最大的时间消耗是向远程同步

状态后端保存策略

    import org.apache.flink.streaming.api.CheckpointingMode
    import org.apache.flink.api.common.restartstrategy.RestartStrategies
    sourceTestEnv.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE)
    //source添加完成  到所有节点完成时间
    sourceTestEnv.getCheckpointConfig.setCheckpointTimeout(60*1000l)
    //最大允许的checkpoint
    sourceTestEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
    //保证checkpoint之间的间隔   延迟checkpoint发生的时间  设置这个上面同时checkpoint就是1
    sourceTestEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
    //checkpoint 失败多少次
    sourceTestEnv.getCheckpointConfig.setTolerableCheckpointFailureNumber(10)
    //checkpoint重启策略
    //参数:尝试重启次数和尝试重启时间间隔
    sourceTestEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000))
    //参数:失败率  时间间隔  两次重启时间间隔
    sourceTestEnv.setRestartStrategy(RestartStrategies.failureRateRestart(5,
        org.apache.flink.api.common.time.Time.minutes(5),
        org.apache.flink.api.common.time.Time.seconds(10)))

状态查询

    QueryableStateClient
    <dependency>
      <groupid>org.apache.flinkgroupid>
      <artifactid>flink-queryable-state-clientjava_2.12artifactid>
      <version>1.7.1version>
    dependency>

其他概念

    保存点  savepoint  有计划的时间备份/更新应用程序/版本迁移/暂停和重启应用/资源的重新分配
        data.uid("保存的id名称")
    状态一致性   数据不能丢,也不能重复计算   AT-MOST-ONCE(最多一次)    AT-LEAST-ONCE(至少一次)    EXACTLY-ONCE(精确一次)
    EXACTLY-ONCE如何保证: 内部checkpoint  source可以重新读取数据  sink保证不会重复写入(幂等写入/事务写入)
    幂等写入(写一次和写多次结果是一样的hashmap)   事务写入(原子性 等到checkpoint完成以后 再写入到外部系统 (预写日志 效率不高))
    两阶段提交:每个checkpoint和sink任务都会启动一个事务    实现了EXACTLY-ONCE 需要实现支持外部的接口(TwoPhaseCommitSinkFunction)
    两阶段提交对外部系统有要求:事务支持/checkpoint间隔时间内必须开启一个事务/完成以前事务是等待提交状态/能够恢复事务/提交事务是幂等操作
    flink_kafka 保证状态一致性  :

有状态的flatMap

    import org.apache.flink.api.common.functions.{RichFlatMapFunction}
    class MyFlatMap(tmp:Doubleextends RichFlatMapFunction[String,(String,Double,Double)] {
      var valuestate:ValueState[String] = _
      override def open(parameters: Configuration): Unit = {
        valuestate = getRuntimeContext.getState(new ValueStateDescriptor[String]("valuestate",classOf[String]))
      }
      override def flatMap(inString, collector: Collector[(String, Double, Double)]): Unit = {
        if(tmp < valuestate.value().toDouble){
          collector.collect(valuestate.value(),tmp,valuestate.value().toDouble)
          valuestate.update(tmp.toString)
        }
      }
    }
    import scala.collection.immutable.List.empty
    val data = streamenv.fromCollection(Array((1,1000),(2,2001),(3,9002))).map(x=>(x._1,x._2,1)).keyBy(x=>x._1)
    data.map(x=>(x,1)).keyBy(x=>x._1)//必须是再keyby以后
      .flatMapWithState[(String,Int),String] = {//输出类型,中间结果的类型
      case ((x,y),None)=>(empty,Some(x))//data输入的数据  None输出的类型
      case ((x,y),last:Some[String])=>(Tuple2(last.get,y),Some(x))//第一个值换成上一个的值
    }

水位线的高级处理

    import org.apache.flink.streaming.api.scala.createTypeInformation
    import org.apache.flink.util.Collector
    import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
    //通过水位线可以简单利用水位线,但是如果我们需要获取当前水位线等操作是无法实现的,这时候就需要使用ProcessFunction
    val data = streamenv.fromCollection(Array((1,1000),(2,2001),(3,9002))).map(x=>(x._1,x._2,1)).keyBy(x=>x._1)
    data.process(new MyKeyedProcessFunction).print()
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.configuration.Configuration
    //key的类型  输入类型  输出类型  添加定时器
    class MyKeyedProcessFunction extends KeyedProcessFunction[Int,(Int,Int,Int),String]{
      var lastvalue:ValueState[String] = _
      var lastts:ValueState[Long] = _
      override def open(parameters: Configuration): Unit = {
        lastvalue = getRuntimeContext.getState(new ValueStateDescriptor[String]("lastvalue",classOf[String]))
        lastts = getRuntimeContext.getState(new ValueStateDescriptor[Long]("lastts",classOf[Long]))
      }
      override def processElement(input: (Int,Int,Int), ctx: KeyedProcessFunction[Int, (Int,Int,Int), String]#Context, collector: Collector[String]): Unit = {
//        ctx.output(empty) //侧输出流   ctx.timerService().currentWatermark()//当前的水位线   ctx.timerService().registerEventTimeTimer(ctx.timestamp()+10*1000)
        if(lastvalue.value()>input.toString() && lastts.value()==0)
          ctx.timerService().registerEventTimeTimer(ctx.timestamp()+10*1000)//注册当前时间戳的定时器
        else if(lastvalue.value()0){
          ctx.timerService().deleteEventTimeTimer(lastts.value()+10*1000)//注册当前时间戳的定时器
          lastts.clear()
        }
        lastts.update(input.toString().toLong)
        lastvalue.update(input.toString())
      }
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, (Int,Int,Int), String]#OnTimerContext, out: Collector[String]): Unit = {
        //多个定时器通过if else 判断
        System.out.println("一直上升")
      }
    }

窗口

flink只能处理一段时间内的数据,这个一段时间就可以用窗口来切分


    //数据延迟可以通过:可以启动多个窗口(桶)、数据往多个窗口里面扔
    //窗口的分类:1.按照驱动类型分类(时间窗口和计数窗口)
    //          2.分配数据类型分类(滚动窗口[当前窗口的大小,没有重迭]、
    //          滑动窗口[窗口大小,滑动的距离,会重迭]、会话窗口[设置会话超时时间]、
    //          全局窗口[全局有效 不会触发计算])
    import org.apache.flink.streaming.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows//事件时间处理窗口
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows//事件时间处理窗口
    import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows//事件时间处理窗口
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
    import org.apache.flink.streaming.api.windowing.assignersTumblingProcessingTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    val sourceTestEnv = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala._
    import org.apache.flink.table.api._
    val windowdata = sourceTestEnv.fromElements[String]
      ("1,1663828406000","2,1663828416000","3,1663828426000","4,1663828436000")
    windowdata.map(x=>(x,1)).keyBy(x=>x._1).window(SlidingEventTimeWindows.of(Time.minutes(10),Time.minutes(5)))
    windowdata.map(x=>(x.split(",").apply(0),x.split(",").apply(1))).keyBy(x=>x._1)//按照用户分区
      .window(SlidingEventTimeWindows.of(Time.minutes(10),Time.minutes(5)))//滑动事件窗口  1窗口大小 2滑动距离
//      .window(TumblingEventTimeWindows.of(Time.minutes(5)))//滚动事件窗口  滚动大小
//      .window(EventTimeSessionWindows.withGap(Time.seconds(2)))//时间时间会话窗口
//      .countWindow(2000,500)//一个参数滚动窗口   两个参数滑动窗口
    windowdata.map(x=>(x.split(",").apply(0),x.split(",").apply(1)))//不keyby的操作
      .windowAll(EventTimeSessionWindows.withGap(Time.seconds(2)))//并行度是1

watermark window 联合使用

    import org.apache.flink.streaming.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}//事件时间处理窗口
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    val sourceTestEnv = StreamExecutionEnvironment.getExecutionEnvironment
    //
    val watermark_window_data = sourceTestEnv.fromElements[String]
        ("1,1663828406000","2,1663828416000","3,1663828426000","4,1663828436000")
    import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
    import org.apache.flink.streaming.api.scala.OutputTag
    val watermark_window_data_latetag = new OutputTag[(String, Int)]("late")
    sourceTestEnv.getConfig.setAutoWatermarkInterval(100)
    val result = watermark_window_data.assignTimestampsAndWatermarks(
      WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(
        new SerializableTimestampAssigner[String](){
          override def extractTimestamp(t: Stringl: Long): Long = t.split(",").apply(3).toLong
        })).map(x=>(x,1)).keyBy(x=>x._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .allowedLateness(Time.minutes(10))//处理迟到的数据  迟到的数据立刻输出
      .sideOutputLateData(watermark_window_data_latetag)
      .reduce((x,y)=>(x._1,x._2+y._2))
    result.getSideOutput(watermark_window_data_latetag).print()

搬砖多年终不得要领,遂载源码看之望得真经。



推荐阅读
  • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
    报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 透明木头问世!“木头大王”胡良兵再发顶刊,已成立公司加速落地69
    道翰天琼认知智能机器人平台API接口大脑为您揭秘。木材是人类最古老的建筑材料之一,也是一种绿色节能材料,我们对其外观的认知可谓根深蒂固。如今,随着透明木材的问世,这一观感将被颠覆。 ... [详细]
  • Flink(三)IDEA开发Flink环境搭建与测试
    一.IDEA开发环境1.pom文件设置1.8 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文介绍了机器学习手册中关于日期和时区操作的重要性以及其在实际应用中的作用。文章以一个故事为背景,描述了学童们面对老先生的教导时的反应,以及上官如在这个过程中的表现。同时,文章也提到了顾慎为对上官如的恨意以及他们之间的矛盾源于早年的结局。最后,文章强调了日期和时区操作在机器学习中的重要性,并指出了其在实际应用中的作用和意义。 ... [详细]
  • 本文讨论了在dva中引入antd组件table时没有显示样式的问题。提供了.roadhogrc文件的配置,包括环境和import的设置。同时介绍了extraBabelPlugins和transform-runtime的使用方法,并解释了libraryName和css的含义。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 本文介绍了使用readlink命令获取文件的完整路径的简单方法,并提供了一个示例命令来打印文件的完整路径。共有28种解决方案可供选择。 ... [详细]
  • 使用freemaker生成Java代码的步骤及示例代码
    本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
  • 关于如何快速定义自己的数据集,可以参考我的前一篇文章PyTorch中快速加载自定义数据(入门)_晨曦473的博客-CSDN博客刚开始学习P ... [详细]
  • 本文整理了Java中org.apache.pig.backend.executionengine.ExecException.<init>()方法的一些代码 ... [详细]
  • 逻辑回归_训练二元分类器#训练一个二元分类器fromsklearn.linear_modelimportLogisticRegressionfromsklearnimport ... [详细]
  • 马蜂窝数据总监分享:从数仓到数据中台,大数据演进技术选型最优解
    大家好,今天分享的议题主要包括几大内容:带大家回顾一下大数据在国内的发展,从传统数仓到当前数据中台的演进过程;我个人认为数 ... [详细]
author-avatar
一恒谢永泰_661
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有