热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flinksql设置并行度_Flink状态:KeyedState和OperatorListState详细教程

本文将重点跟大家讲解Flink的状态管理机制,包括状态要解决的问题、Flink几种不同类型的状态、KeyedState和OperatorListState的使用方法等

本文将重点跟大家讲解Flink的状态管理机制,包括状态要解决的问题、Flink几种不同类型的状态、Keyed State和Operator List State的使用方法等。相关代码参见的github:https://github.com/luweizheng/flink-tutorials。图片文字均为原创,转载请联系本专栏。

为什么要管理状态

有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能:

  • 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
  • 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
  • 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。
  • 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

我们知道,Flink的一个算子有多个子任务,每个子任务分布在不同实例上,我们可以把状态理解为某个算子子任务在其当前实例上的一个变量,变量记录了数据流的历史信息。当新数据流入时,我们可以结合历史信息来进行计算。实际上,Flink的状态是由算子的子任务来创建和管理的。一个状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。一个简单的例子是对一个时间窗口内输入流的某个整数字段求和,那么当算子子任务接收到新元素时,会获取已经存储在状态中的数值,然后将当前输入加到状态上,并将状态数据更新。

89a2877f9737a446f20bab3a0588ebc9.png

获取和更新状态的逻辑其实并不复杂,但流处理框架还需要解决以下几类问题:

  • 数据的产出要保证实时性,延迟不能太高。
  • 需要保证数据不丢不重,恰好计算一次,尤其是当状态数据非常大或者应用出现故障需要恢复时,要保证状态的计算不出任何错误。
  • 一般流处理任务都是7*24小时运行的,程序的可靠性非常高。

基于上述要求,我们不能将状态直接交由内存管理,因为内存的容量是有限制的,当状态数据稍微大一些时,就会出现内存不够的问题。假如我们使用一个持久化的备份系统,不断将内存中的状态备份起来,当流处理作业出现故障时,需要考虑如何从备份中恢复。而且,大数据应用一般是横向分布在多个节点上,流处理框架需要保证横向的伸缩扩展性。可见,状态的管理并不那么容易。

作为一个计算框架,Flink提供了有状态的计算,封装了一些底层的实现,比如状态的高效存储、Checkpoint和Savepoint持久化备份机制、计算资源扩缩容等问题。因为Flink接管了这些问题,开发者只需调用Flink API,这样可以更加专注于业务逻辑。

Flink的几种状态类型

Managed State和Raw State

Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。从名称中也能读出两者的区别:Managed State是由Flink管理的,Flink帮忙存储、恢复和优化,Raw State是开发者自己管理的,需要自己序列化。

3c8b203a9cd614c0d795c1f5d91dbc58.png

两者的具体区别有:

  • 从状态管理的方式上来说,Managed State由Flink Runtime托管,状态是自动存储、自动恢复的,Flink在存储管理和持久化上做了一些优化。当我们横向伸缩,或者说我们修改Flink应用的并行度时,状态也能自动重新分布到多个并行实例上。Raw State是用户自定义的状态。
  • 从状态的数据结构上来说,Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。Raw State只支持字节,任何上层数据结构需要序列化为字节数组。使用时,需要用户自己序列化,以非常底层的字节数组形式存储,Flink并不知道存储的是什么样的数据结构。
  • 从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。

下文将重点介绍Managed State。

Keyed State和Operator State

对Managed State继续细分,它又有两种类型:Keyed State和Operator State。这里先简单对比两种状态,后续还将展示具体的使用方法。

Keyed State是KeyedStream上的状态。假如输入流按照id为Key进行了keyBy分组,形成一个KeyedStream,数据流中所有id为1的数据共享一个状态,可以访问和更新这个状态,以此类推,每个Key对应一个自己的状态。下图展示了Keyed State,因为一个算子子任务可以处理一到多个Key,算子子任务1处理了两种Key,两种Key分别对应自己的状态。

dee2d86600b4e6605e44d056202751d3.png

Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。下图展示了Operator State,算子子任务1上的所有数据可以共享第一个Operator State,以此类推,每个算子子任务上的数据共享自己的状态。

e41f98ca49716fb91bc485fbf4f8d0f0.png

无论是Keyed State还是Operator State,Flink的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。

在之前各算子的介绍中曾提到,为了自定义Flink的算子,我们可以重写Rich Function接口类,比如RichFlatMapFunction。使用Keyed State时,我们也可以通过重写Rich Function接口类,在里面创建和访问状态。对于Operator State,我们还需进一步实现CheckpointedFunction接口。

Keyed StateOperator State适用算子类型只适用于KeyedStream上的算子可以用于所有算子状态分配每个Key对应一个状态一个算子子任务对应一个状态创建和访问方式重写Rich Function,通过里面的RuntimeContext访问实现CheckpointedFunction等接口横向扩展状态随着Key自动在多个算子子任务上迁移有多种状态重新分配的方式支持的数据结构ValueState、ListState、MapState等ListState、BroadcastState等

b25cf5ce7e503f03f694b40cb346de5b.png

上表总结了Keyed State和Operator State的区别。

横向扩展问题

状态的横向扩展问题主要是指修改Flink应用的并行度,确切的说,每个算子的并行实例数或算子子任务数发生了变化,应用需要关停或启动一些算子子任务,某份在原来某个算子子任务上的状态数据需要平滑更新到新的算子子任务上。其实,Flink的Checkpoint就是一个非常好的在各算子间迁移状态数据的机制。算子的本地状态将数据生成快照(snapshot),保存到分布式存储(如HDFS)上。横向伸缩后,算子子任务个数变化,子任务重启,相应的状态从分布式存储上重建(restore)。

130aa0241008f5d6315cab93f6b68aab.png

对于Keyed State和Operator State这两种状态,他们的横向伸缩机制不太相同。由于每个Keyed State总是与某个Key相对应,当横向伸缩时,Key总会被自动分配到某个算子子任务上,因此Keyed State会自动在多个并行子任务之间迁移。对于一个非KeyedStream,流入算子子任务的数据可能会随着并行度的改变而改变。如上图所示,假如一个应用的并行度原来为2,那么数据会被分成两份并行地流入两个算子子任务,每个算子子任务有一份自己的状态,当并行度改为3时,数据流被拆成3支,或者并行度改为1,数据流合并为1支,此时状态的存储也相应发生了变化。对于横向伸缩问题,Operator State有两种状态分配方式:一种是均匀分配,另一种是将所有状态合并,再分发给每个实例上。

Keyed State的使用方法

对于Keyed State,Flink提供了几种现成的数据结构供我们使用,包括ValueState、ListState等,他们的继承关系如下图所示。首先,State主要有三种实现,分别为ValueState、MapState和AppendingState,AppendingState又可以细分为ListState、ReducingState和AggregatingState。

f71149845c438ba1b12fe073ca085e72.png

这几个状态的具体区别在于:

  • ValueState[T]是单一变量的状态,T是某种具体的数据类型,比如Double、String,或我们自己定义的复杂数据结构。我们可以使用value()方法获取状态,使用update(value: T)更新状态。
  • MapState[K, V]存储一个Key-Value map,其功能与Java的Map几乎相同。get(key: K)可以获取某个key下的value,put(key: K, value: V)可以对某个key设置value,contains(key: K)判断某个key是否存在,remove(key: K)删除某个key以及对应的value,entries(): java.lang.Iterable[java.util.Map.Entry[K, V]]返回MapState中所有的元素,iterator(): java.util.Iterator[java.util.Map.Entry[K, V]]返回一个迭代器。需要注意的是,MapState中的key和Keyed State的key不是同一个key。
  • ListState[T]存储了一个由T类型数据组成的列表。我们可以使用add(value: T)或addAll(values: java.util.List[T])向状态中添加元素,使用get(): java.lang.Iterable[T]获取整个列表,使用update(values: java.util.List[T])来更新列表,新的列表将替换旧的列表。
  • ReducingState[T]和AggregatingState[IN, OUT]与ListState[T]同属于MergingState[T]。与ListState[T]不同的是,ReducingState[T]只有一个元素,而不是一个列表。它的原理是新元素通过add(value: T)加入后,与已有的状态元素使用ReduceFunction合并为一个元素,并更新到状态里。AggregatingState[IN, OUT]与ReducingState[T]类似,也只有一个元素,只不过AggregatingState[IN, OUT]的输入和输出类型可以不一样。ReducingState[T]和AggregatingState[IN, OUT]与窗口上进行ReduceFunction和AggregateFunction很像,都是将新元素与已有元素做聚合。

注意,Flink的核心代码目前使用Java实现的,而Java的很多类型与Scala的类型不太相同,比如List和Map。这里不再详细解释Java和Scala的数据类型的异同,但是开发者在使用Scala调用这些接口,比如状态的接口,需要注意将Java的类型转为Scala的类型。对于List和Map的转换,只需要需要引用import scala.collection.JavaConversions._,并在必要的地方添加后缀asScala或asJava来进行转换。此外,Scala和Java的空对象使用习惯不太相同,Java一般使用null表示空,Scala一般使用None。

之前的文章中其实已经多次使用过状态,这里再次使用电商用户行为分析来演示如何使用状态。我们知道电商平台会将用户与商品的交互行为收集记录下来,行为数据主要包括几个字段:userId、itemId、categoryId、behavior和timestamp。其中userId和itemId分别代表用户和商品的唯一ID,categoryId为商品类目ID,behavior表示用户的行为类型,包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav)等,timestamp记录行为发生时间。本文采用阿里巴巴提供的一个淘宝用户行为数据集,为了精简需要,只节选了部分数据。下面的代码使用MapState[String, Int]记录某个用户某种行为出现的次数。这里读取了数据集文件,模拟了一个淘宝用户行为数据流。

/** * 用户行为 * categoryId为商品类目ID * behavior包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav) * */case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)class MapStateFunction extends RichFlatMapFunction[UserBehavior, (Long, String, Int)] { // 指向MapState的句柄 private var behaviorMapState: MapState[String, Int] = _ override def open(parameters: Configuration): Unit = { // 创建StateDescriptor val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int]) // 通过StateDescriptor获取运行时上下文中的状态 behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor) } override def flatMap(input: UserBehavior, collector: Collector[(Long, String, Int)]): Unit = { var behaviorCnt = 1 // behavior有可能为pv、cart、fav、buy等 // 判断状态中是否有该behavior if (behaviorMapState.contains(input.behavior)) { behaviorCnt = behaviorMapState.get(input.behavior) + 1 } // 更新状态 behaviorMapState.put(input.behavior, behaviorCnt) collector.collect((input.userId, input.behavior, behaviorCnt)) }}def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(8) // 获取数据源 val sourceStream: DataStream[UserBehavior] = env .addSource(new UserBehaviorSource("state/UserBehavior-50.csv")).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() { override def extractAscendingTimestamp(userBehavior: UserBehavior): Long = { // 原始数据单位为秒,乘以1000转换成毫秒 userBehavior.timestamp * 1000 } } ) // 生成一个KeyedStream val keyedStream = sourceStream.keyBy(user => user.userId) // 在KeyedStream上进行flatMap val behaviorCountStream = keyedStream.flatMap(new MapStateFunction) behaviorCountStream.print() env.execute("state example")}class UserBehaviorSource(path: String) extends RichSourceFunction[UserBehavior] { var isRunning: Boolean = true // 输入源 var streamSource: InputStream = _ override def run(sourceContext: SourceContext[UserBehavior]): Unit = { // 从项目的resources目录获取输入 streamSource = MapStateExample.getClass.getClassLoader.getResourceAsStream(path) val lines: Iterator[String] = scala.io.Source.fromInputStream(streamSource).getLines while (isRunning && lines.hasNext) { val line = lines.next() val itemStrArr = line.split(",") val userBehavior = UserBehavior(itemStrArr(0).toLong, itemStrArr(1).toLong, itemStrArr(2).toInt, itemStrArr(3), itemStrArr(4).toLong) sourceContext.collect(userBehavior) } } override def cancel(): Unit = { streamSource.close() isRunning = false }}

Keyed State是针对KeyedStream的状态,必须先对一个DataStream进行keyBy操作。在本例中,我们对用户ID进行了keyBy,那么用户ID为1的行为数据共享同一状态数据,以此类推,每个用户ID的行为数据共享自己的状态数据。之后,我们需要实现Rich类函数,比如RichFlatMapFunction,或者KeyedProcessFunction等函数类。这些算子函数类都是RichFunction的一种实现,他们都有运行时上下文RuntimeContext,RuntimeContext包含了状态数据。在实现这些算子函数类时,一般是在open方法中声明状态。open是算子的初始化方法,它在实际处理函数之前调用。具体到状态的使用,我们首先要注册一个StateDescriptor。从名字中可以看出,StateDescriptor是状态的一种描述,它描述了状态的名字和状态的数据结构。状态的名字可以用来区分不同的状态,一个算子内可以有多个不同的状态,每个状态的StateDescriptor需要设置不同的名字。同时,我们也需要指定状态的具体数据结构,指定具体的数据结构非常重要,因为Flink要对其进行序列化和反序列化,以便进行Checkpoint和必要的恢复。数据结构的类型和序列化机制可以参考我之前的文章:Flink进阶教程:数据类型和序列化机制简介。在本例中,我们使用val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int])注册了一个MapStateStateDescriptor,key为某种行为,如pv、buy等,数据类型为String,value为该行为出现的次数,数据类型为Int。此外,每种类型的状态都有对应的StateDescriptor,比如MapStateDescriptor对应MapState,ValueStateDescriptor对应ValueState。

接着我们通过StateDescriptor向RuntimeContext中获取状态句柄。本例中对应的代码为:behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor)。状态句柄并不存储状态,它只是Flink提供的一种访问状态的接口,状态数据实际存储在State Backend中。

使用和更新状态发生在实际的处理函数上,比如RichFlatMapFunction中的flatMap方法,在实现自己的业务逻辑时访问和修改状态,比如通过get方法获取状态。

其他类型的状态使用方法与本例中所展示的大致相同。ReducingState和AggregatingState在注册StateDescriptor时,还需要实现一个ReduceFunction或AggregationFunction。下面的代码注册ReducingStateDescriptor时实现一个YourReduceFunction,YourReduceFunction实现了ReduceFunction。我们在ReducingState中使用add(in: T)方法向状态里增加一个元素,新元素和状态中已有数据通过ReduceFunction两两聚合。AggregatingState的使用方法与之类似。

val reducingStateDescriptor = new ReducingStateDescriptor[UserBehavior]("reducing", new YourReduceFunction, classOf[UserBehavior])

必要时候,我们还需要调用Keyed State中的clear()方法来清除一个Keyed State。

Operator List State的使用方法

状态从本质上来说,是Flink算子子任务的一种本地数据,为了保证数据可恢复性,使用Checkpoint机制来将状态数据持久化输出到存储空间上。状态相关的主要逻辑有两项:一、将算子子任务本地内存数据在Checkpoint时snapshot写入存储;二、初始化或重启应用时,以一定的逻辑从存储中读出并变为算子子任务的本地内存数据。Keyed State对这两项内容做了更完善的封装,开发者可以开箱即用。对于Operator State来说,每个算子子任务管理自己的Operator State,或者说每个算子子任务上的数据流共享同一个状态,可以访问和修改该状态。Flink的算子子任务上的数据在程序重启、横向伸缩等场景下不能保证百分百的一致性。换句话说,重启Flink应用后,某个数据流元素不一定会和上次一样,还能流入该算子子任务上。因此,我们需要根据自己的业务场景来设计snapshot和restore的逻辑。为了实现这两个步骤,Flink提供了最为基础的CheckpointedFunction接口类。

public interface CheckpointedFunction { // Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化void snapshotState(FunctionSnapshotContext context) throws Exception; // 初始化时会调用这个方法,向本地状态中填充数据void initializeState(FunctionInitializationContext context) throws Exception;}

在Flink的Checkpoint机制下,当一次snapshot触发后,snapshotState会被调用,将本地状态持久化到存储空间上。这里我们可以先不用关心snapshot是如何被触发的,暂时理解成snapshot是自动触发的,后续文章会介绍Flink的Checkpoint机制。initializeState在算子子任务初始化时被调用,初始化包括两种场景:一、整个Flink作业第一次执行,状态数据被初始化为一个默认值;二、Flink作业重启,之前的作业已经将状态输出到存储,通过这个方法将存储上的状态读出并填充到这个本地状态中。

目前Operator State主要有三种,其中ListState和UnionListState在数据结构上都是一种ListState,还有一种BroadcastState。这里我们主要介绍ListState这种列表形式的状态。这种状态以一个列表的形式序列化并存储,以适应横向扩展时状态重分布的问题。每个算子子任务有零到多个状态S,组成一个列表ListState[S]。各个算子子任务将自己状态列表的snapshot到存储,整个状态逻辑上可以理解成是将这些列表连接到一起,组成了一个包含所有状态的大列表。当作业重启或横向扩展时,我们需要将这个包含所有状态的列表重新分布到各个算子子任务上。ListState和UnionListState的区别在于:ListState是将整个状态列表按照round-ribon的模式均匀分布到各个算子子任务上,每个算子子任务得到的是整个列表的子集;UnionListState按照广播的模式,将整个列表发送给每个算子子任务。

Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。这里我们来看一个Flink官方提供的Sink案例以了解CheckpointedFunction的工作原理。

// BufferingSink需要继承SinkFunction以实现其Sink功能,同时也要继承CheckpointedFunction接口类class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)] with CheckpointedFunction { // Operator List State句柄 @transient private var checkpointedState: ListState[(String, Int)] = _ // 本地缓存 private val bufferedElements = ListBuffer[(String, Int)]() // Sink的核心处理逻辑,将上游数据value输出到外部系统 override def invoke(value: (String, Int), context: Context): Unit = { // 先将上游数据缓存到本地的缓存 bufferedElements += value // 当本地缓存大小到达阈值时,将本地缓存输出到外部系统 if (bufferedElements.size == threshold) { for (element

上面的代码在输出到Sink之前,先将数据放在本地缓存中,并定期进行snapshot,这实现了批量输出的功能,批量输出能够减少网络等开销。同时,程序能够保证数据一定会输出外部系统,因为即使程序崩溃,状态中存储着还未输出的数据,下次启动后还会将这些未输出数据读取到内存,继续输出到外部系统。

注册和使用Operator State的代码和Keyed State相似,也是先注册一个StateDescriptor,并指定状态名字和数据类型,然后从FunctionInitializationContext中获取OperatorStateStore,进而获取ListState。如果是UnionListState,那么代码改为:context.getOperatorStateStore.getUnionListState。

val descriptor = new ListStateDescriptor[(String, Long)]( "buffered-elements", TypeInformation.of(new TypeHint[(String, Long)]() {}))checkpointedState = context.getOperatorStateStore.getListState(descriptor)

状态的初始化逻辑中,我们用context.isRestored来判断是否为作业重启,这样可以从之前的Checkpoint中恢复并写到本地缓存中。

注意,CheckpointedFunction接口类的initializeState方法的参数为FunctionInitializationContext,基于这个上下文参数我们不仅可以通过getOperatorStateStore获取OperatorStateStore,也可以通过getKeyedStateStore来获取KeyedStateStore,进而通过getState、getMapState等方法获取Keyed State,比如:context.getKeyedStateStore().getState(valueDescriptor)。这与在Rich函数类中使用Keyed State的方式并不矛盾。CheckpointedFunction是Flink有状态计算的最底层接口,它提供了最丰富的状态接口。

ListCheckpointed接口类是CheckpointedFunction接口类的一种简写,ListCheckpointed提供的功能有限,只支持均匀分布的ListState,不支持全量广播的UnionListState。

public interface ListCheckpointed {// Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化List snapshotState(long checkpointId, long timestamp) throws Exception; // 从上次Checkpoint中恢复数据到本地内存void restoreState(List state) throws Exception;}

跟CheckpointedFunction中的snapshotState方法一样,这里的snapshotState也是在做备份,但这里的参数列表更加精简,其中checkpointId是一个单调递增的数字,用来表示某次Checkpoint,timestamp是Checkpoint发生的实际时间,这个方法以列表形式返回需要写入存储的状态。restoreState方法用来初始化状态,包括作业第一次启动或者作业失败重启。参数是一个列表形式的状态,是均匀分布给这个算子子任务的状态数据。



推荐阅读
  • MySQL 5.7 学习指南:SQLyog 中的主键、列属性和数据类型
    本文介绍了 MySQL 5.7 中主键(Primary Key)和自增(Auto-Increment)的概念,以及如何在 SQLyog 中设置这些属性。同时,还探讨了数据类型的分类和选择,以及列属性的设置方法。 ... [详细]
  • oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
    createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 字节流(InputStream和OutputStream),字节流读写文件,字节流的缓冲区,字节缓冲流
    字节流抽象类InputStream和OutputStream是字节流的顶级父类所有的字节输入流都继承自InputStream,所有的输出流都继承子OutputStreamInput ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • 在《Cocos2d-x学习笔记:基础概念解析与内存管理机制深入探讨》中,详细介绍了Cocos2d-x的基础概念,并深入分析了其内存管理机制。特别是针对Boost库引入的智能指针管理方法进行了详细的讲解,例如在处理鱼的运动过程中,可以通过编写自定义函数来动态计算角度变化,利用CallFunc回调机制实现高效的游戏逻辑控制。此外,文章还探讨了如何通过智能指针优化资源管理和避免内存泄漏,为开发者提供了实用的编程技巧和最佳实践。 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 应用链时代,详解 Avalanche 与 Cosmos 的差异 ... [详细]
  • window下的python安装插件,Go语言社区,Golang程序员人脉社 ... [详细]
  • com.hazelcast.config.MapConfig.isStatisticsEnabled()方法的使用及代码示例 ... [详细]
author-avatar
搁浅几世琉璃
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有