热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink1.1和storm0.10API实例对比

flink1.1和storm0.10API实例对比本来标题想叫flink和stormAPI实例对比,发现它们最近API都变更频繁,就加上了版本号,storm在1.0版本后增加了IW
flink 1.1 和 storm 0.10 API 实例对比

本来标题想叫 flink 和 storm API 实例对比,发现它们最近 API 都变更频繁,
就加上了版本号,storm 在 1.0 版本后增加了 IWindowedBolt 接口,不过我还没有试用,
之后可能会补上.

需求说明

一份 Kafka 日志记录着用户在不同平台的访问时间,uuid,一条信息的例子如下

{
"platform": 1,
"time": 1470267515,
"uuid": "ad751bb3-d0ee-c9b9-be26-2ba4570bb3fe",
}

我们需要统计不同平台每天的uv情况,并且有如下要求.

  1. 每秒钟要输出最新的统计结果

  2. 程序永远跑着不会停,所以要定期清理内存里的过时数据

  3. 收到的消息里的时间字段并不是按照顺序严格递增的,所以要有一定的容错机制

  4. 访问uv并不一定每秒钟都会变化,重复输出对IO是巨大的浪费,所以要在uv变更时在一秒
    内输出结果,未变更时不输出

Storm 方案

输入

输入数据使用KafkaSpout, Storm 有现成的 KafkaSpout, 加上配置文件即可使用.

val hosts = new ZkHosts("zkhosts")
val spoutCOnfig= new SpoutConfig(hosts, "sometopic", "zkroot", "xxx")
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme)
spoutConfig.startOffsetTime = OffsetRequest.LatestTime
val builder = new TopologyBuilder
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig))

计算结果

创建一个KeeperBolt,用Map和Set保存当前结果,一条日志对应的Set要通过日志时间对应
的日期和平台取得

class KeeperBolt extends BaseBasicBolt {
val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]]
var lastSize = Map.empty[LocalDate, Map[Int, Int]]
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
input.getSourceStreamId match {
case "default" =>
val jsObj = Json.parse(input.getString(0))
val time = (jsObj \ "time").as[Int]
val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
val datemap = map.getOrElseUpdate(date, mutable.Map.empty)
val set = datemap.getOrElseUpdate(platform, mutable.Set.empty)
set.add(uuid.hashCode)

定期清理旧数据&输出当前结果

Storm处理这方面的内容比较麻烦,为了不破坏每个Storm Task单线程的特性,我们不应该直接
在Bolt里起一个Timer做这件事情,否则就要加上不必要的锁,一来影响性能和增加程序的复杂度,
二来自创线程也会影响storm对Topology运行状态/性能的评估,所以需要通过另外建立Spout的
方式发送消息让Bolt能够收到清空消息的作用.

class CommandSpout extends BaseRichSpout {
var collecor: SpoutOutputCollector = _
var commands = Vector.empty[String]
override def nextTuple(): Unit = {
commands.synchronized {
while (commands.nonEmpty) {
println("emit: " + commands.head)
collecor.emit("command", new Values(commands.head), Random.nextInt())
commands = commands.tail
}
}
}
override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
this.collecor = collector
val timer = new Timer
timer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
commands.synchronized {
commands +:= "output"
}
}
}, 1000, 1000)
timer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
commands.synchronized {
commands +:= "retain"
}
}
}, 86400, 86400)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declareStream("command", new Fields("command"))
}
}

KeeperBolt收到消息以后,要执行相应的动作,包括输出和清理旧数据.
输出方面:
需要单独建立一个LastSize类维持之前输出时候的结果,将现有的map中每个uv的数量
和上次输出的uv比较,只输出发生变化的内容.
清理旧数据:
KeeperBolt增添了一个变量叫做beginDate,当beginDate每日更新以后,
一方面将map中日期早于beginDate的数据删除,
另一方面,之后接收消息的过程中会做过滤.

class KeeperBolt(var beginDate: LocalDate = LocalDate.now().minusDays(2)) extends BaseBasicBolt {
val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]]
var lastSize = Map.empty[LocalDate, Map[Int, Int]]
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
input.getSourceStreamId match {
case "default" =>
val jsObj = Json.parse(input.getString(0))
val time = (jsObj \ "time").as[Int]
val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate
if (date.compareTo(beginDate) > 0) {
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
val datemap = map.getOrElseUpdate(date, mutable.Map.empty)
val set = datemap.getOrElseUpdate(platform, mutable.Set.empty)
set.add(uuid.hashCode)
}
case "command" =>
val command = input.getString(0) match {
case "output" =>
val currSize: Map[LocalDate, Map[Int, Int]] = map.map {
case (key, submap) => (key, submap.map {
case (platform, set) => (platform, set.size)
}.toMap)
}.toMap
println("currSize = " + currSize)
println("lastSize = " + lastSize)
for ((date, subMap) <- currSize; (platform, uv) <- subMap) {
lastSize.get(date).flatMap(_.get(platform)) match {
case Some(lastuv) if lastuv == uv => //不做任何事
case _ =>
println("updated date,platform,uv = " + (date, platform, uv))
}
}
lastSize = currSize
case "retain" =>
beginDate = LocalDate.now().minusDays(2)
}
}
}

构建 Topology

每个节点工作完成以后,我们需要编码把节点连接起来.
总共用到两个Spout和一个Bolt,两个Spout分别负责输出日志和发送命令,
一个Bolt存储数据和订阅两个Spout

val builder = new TopologyBuilder
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig))
builder.setSpout("command-spout", new CommandSpout)
builder.setBolt("keeper", new KeeperBolt)
.allGrouping("command-spout", "command")
.localOrShuffleGrouping("kafka-spout")
val cOnf= new Config
conf.setNumWorkers(1)
val topo = builder.createTopology()
val cluster = new LocalCluster()
StormSubmitter.submitTopologyWithProgressBar("Boom", conf, topo)

这样一个计算每天不同平台uv输出到,定时清理旧数据,永远运行的应用就完成了.
完整代码

import java.time.{Instant, LocalDate, ZoneId}
import java.util
import java.util.{Timer, TimerTask}
import kafka.api.OffsetRequest
import org.apache.storm.kafka.{KafkaSpout, SpoutConfig, StringScheme, ZkHosts}
import org.apache.storm.spout.{SchemeAsMultiScheme, SpoutOutputCollector}
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.base.{BaseBasicBolt, BaseRichSpout}
import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer, TopologyBuilder}
import org.apache.storm.tuple.{Fields, Tuple, Values}
import org.apache.storm.{Config, LocalCluster, StormSubmitter}
import play.api.libs.json.Json
import scala.collection.mutable
import scala.util.Random
object StormMain {
class CommandSpout extends BaseRichSpout {
var collecor: SpoutOutputCollector = _
var commands = Vector.empty[String]
override def nextTuple(): Unit = {
commands.synchronized {
while (commands.nonEmpty) {
println("emit: " + commands.head)
collecor.emit("command", new Values(commands.head), Random.nextInt())
commands = commands.tail
}
}
}
override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
this.collecor = collector
val timer = new Timer
timer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
commands.synchronized {
commands +:= "output"
}
}
}, 1000, 1000)
timer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
commands.synchronized {
commands +:= "retain"
}
}
}, 86400, 86400)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declareStream("command", new Fields("command"))
}
}
class KeeperBolt(var beginDate: LocalDate = LocalDate.now().minusDays(2)) extends BaseBasicBolt {
val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]]
var lastSize = Map.empty[LocalDate, Map[Int, Int]]
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
input.getSourceStreamId match {
case "default" =>
val jsObj = Json.parse(input.getString(0))
val time = (jsObj \ "time").as[Int]
val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate
if (date.compareTo(beginDate) > 0) {
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
val datemap = map.getOrElseUpdate(date, mutable.Map.empty)
val set = datemap.getOrElseUpdate(platform, mutable.Set.empty)
set.add(uuid.hashCode)
}
case "command" =>
val command = input.getString(0) match {
case "output" =>
val currSize: Map[LocalDate, Map[Int, Int]] = map.map {
case (key, submap) => (key, submap.map {
case (platform, set) => (platform, set.size)
}.toMap)
}.toMap
println("currSize = " + currSize)
println("lastSize = " + lastSize)
for ((date, subMap) <- currSize; (platform, uv) <- subMap) {
lastSize.get(date).flatMap(_.get(platform)) match {
case Some(lastuv) if lastuv == uv => //不做任何事
case _ =>
println("updated date,platform,uv = " + (date, platform, uv))
}
}
lastSize = currSize
case "retain" =>
beginDate = LocalDate.now().minusDays(2)
}
}
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
}
}
def main(args: Array[String]): Unit = {
val hosts = new ZkHosts("xxx")
val spoutCOnfig= new SpoutConfig(hosts, "xxx", "/xxx", "xxx")
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme)
spoutConfig.startOffsetTime = OffsetRequest.LatestTime
val builder = new TopologyBuilder
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig))
builder.setSpout("command-spout", new CommandSpout)
builder.setBolt("keeper", new KeeperBolt)
.allGrouping("command-spout", "command")
.localOrShuffleGrouping("kafka-spout")
val cOnf= new Config
conf.setNumWorkers(1)
val topo = builder.createTopology()
val cluster = new LocalCluster()
StormSubmitter.submitTopologyWithProgressBar("Boom", conf, topo)
}
}

潜在问题

  1. Storm方案的API非常底层,像定时任务这样的事情就需要单独建立一个Spout,代码体量已经与
    核心业务逻辑相当,非常繁琐.

  2. 将数据按照时间进行隔离(或者说滑动窗口)功能也需要和业务逻辑交织在一起,
    当需要更改程序的时候人需要多花一些经历找到想更改的代码片段.

  3. 每个节点做什么事情,节点间如何连接都需要人为指定.当一个陌生人刚接手代码的时候,
    他很难通过读一小段核心代码就了解大概,需要通读上下文,在不同类之间多次跳转才能理解.

Flink 方案

Flink针对以上情况作出了改进, API 的抽象程度进行了提高,并且针对常见的使用场景提供了
很多实用工具,下面就是 Flink 实现这个需求的例子,可配合官方文档阅读.

输入

和 Storm 一样,Flink 标配了 Kafka 客户端,配置上参数就能使用了

val properties = new Properties()
properties.setProperty("bootstrap.servers", "xxx")
properties.setProperty("zookeeper.connect", "xxx")
properties.setProperty("group.id", "xxx")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setBufferTimeout(100)
val cOnsumer= new FlinkKafkaConsumer08[JsObject]("order_refer_v6.1", new DeserializationSchema[JsObject] {
override def isEndOfStream(t: JsObject): Boolean = false
override def deserialize(bytes: Array[Byte]): JsObject = {
Json.parse(new String(bytes)).as[JsObject]
}
override def getProducedType: TypeInformation[JsObject] = TypeInformation.of(classOf[JsObject])
}, properties)

定期清理旧数据和定时输出

Flink提供了Window和Watermark两个工具,可以将定时需求和业务逻辑隔离开来,
所以我们先讲这个.

Watermark:
Watermark 是 Flink中用来记录数据进度的工具,一旦 Watermark 超过设定的某个时间
窗口尾端了, Flink 就认为一个时间窗口已经过时了,不应该再被改变,就会将时间窗口
对应的内容从内存中剔除,达到一个新陈代谢的作用.
在下面的代码例子中,由于内存比较宽裕,日志的乱序程度又可能比较大,我并不是用日志中的
时间作为生成Watermark的依据,而是使用系统时间,将三天内的数据全都保留在内存中,当然
timestamp还是从日志得到的,因为timestamp要作为区分Window的依据,如果用系统时间来
生成timestamp就会产生因为程序启动时间不同而产生不同结果的问题了.

consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[JsObject] {
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis() - 86400l * 1000 * 3)
}
override def extractTimestamp(t: JsObject, l: Long): LOng= {
(t \ "time").as[Long] * 1000
}
})

Window:
Watermark设置完成后,使用Window功能非常简单,加两句话就可以了

env.addSource(consumer)
.keyBy(_.\("platform").as[Int])
.timeWindow(Time.days(1))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))

timeWindow(Time.days(1)) 表示把数据按天进行分隔,后面还要带一个
trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
是因为,虽然我们关心的是每天的总结果,但是不希望在一天结束的时候才看到这个结果,
而是每秒钟都能获得最新结果的输出.

业务逻辑

窗口的事情处理完了,现在可以真的开始处理业务逻辑了,Flink中的数据流经过Window处理后
叫做WindowedStream,WindowedStream有一个fold方法,可以将一个Window中的数据进行
聚合产生新的数据,但是除此之外,我们还想知道数据所在窗口的起始时间和结束时间,
这就需要用到apply方法,这相当于fold方法的一个扩展,在进行聚合的同时还能获得窗口信息.

env.addSource(consumer)
.keyBy(_.\("platform").as[Int])
.timeWindow(Time.days(1))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.applyWith((0, Set.empty[Int], 0l, 0l))(
foldFunction = {
case ((_, set, _, 0), jsObj) =>
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
(platform, set + uuid.hashCode, 0, 0)
},
windowFunction = {
case (key, window, results) =>
results.map {
case (platform, set, _, _) =>
(platform, set, window.getStart, window.getEnd)
}
}
)

现在我们拥有了最新的 (平台,集合,窗口起始时间,窗口结束时间),但是一方面我们不需要集合
的内容,只需要了解它最新的大小信息,另一方面集合的大小未必每秒都会改变,为了减少不必要
的输出,我们要在连续两条相同的日志里去掉一条,一般集合的map/flatMap方法做不到这一点,
但是Flink额外提供了一个flatMapWithState方法,可以根据数据流中之前的数据执行不同的
逻辑,下面就是对数据流中的重复数据进行去重的例子.

env.addSource(consumer)
.keyBy(_.\("platform").as[Int])
.timeWindow(Time.days(1))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.applyWith((0, Set.empty[Int], 0l, 0l))(
foldFunction = {
case ((_, set, _, 0), jsObj) =>
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
(platform, set + uuid.hashCode, 0, 0)
},
windowFunction = {
case (key, window, results) =>
results.map {
case (platform, set, _, _) =>
(platform, set, window.getStart, window.getEnd)
}
}
)
.mapWith {
case (key, set, windowStart, windowEnd) =>
(key, set.size, windowStart, windowEnd)
}
.keyBy(0)
.flatMapWithState[(Int, Int, Long, Long), Int] {
case ((key, num, begin, end), curr) =>
curr match {
case Some(numCurr) if numCurr == num =>
(Seq.empty, Some(num)) //如果之前已经有相同的数据,则返回空结果
case _ =>
(Seq((key, num, begin, end)), Some(num))
}
}

这样一来我们就可以做到,数据不更新时不输出结果,数据更新时最快速度输出最新结果.

完整代码

import java.util.Properties
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import play.api.libs.json.{JsObject, Json}
object FlinkMain {
def main(args: Array[String]): Unit = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xxx")
properties.setProperty("zookeeper.connect", "xxx")
properties.setProperty("group.id", "xxx")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setBufferTimeout(100)
val cOnsumer= new FlinkKafkaConsumer08[JsObject]("order_refer_v6.1", new DeserializationSchema[JsObject] {
override def isEndOfStream(t: JsObject): Boolean = false
override def deserialize(bytes: Array[Byte]): JsObject = {
Json.parse(new String(bytes)).as[JsObject]
}
override def getProducedType: TypeInformation[JsObject] = TypeInformation.of(classOf[JsObject])
}, properties)
consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[JsObject] {
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis() - 86400l * 1000 * 3)
}
override def extractTimestamp(t: JsObject, l: Long): LOng= {
(t \ "time").as[Long] * 1000
}
})
val stream =
env.addSource(consumer)
.keyBy(_.\("platform").as[Int])
.timeWindow(Time.days(1))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.applyWith((0, Set.empty[Int], 0l, 0l))(
foldFunction = {
case ((_, set, _, 0), jsObj) =>
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
(platform, set + uuid.hashCode, 0, 0)
},
windowFunction = {
case (key, window, results) =>
results.map {
case (platform, set, _, _) =>
(platform, set, window.getStart, window.getEnd)
}
}
)
.mapWith {
case (key, set, windowStart, windowEnd) =>
(key, set.size, windowStart, windowEnd)
}
.keyBy(0)
.flatMapWithState[(Int, Int, Long, Long), Int] {
case ((key, num, begin, end), curr) =>
curr match {
case Some(numCurr) if numCurr == num =>
(Seq.empty, Some(num)) //如果之前已经有相同的数据,则返回空结果
case _ =>
(Seq((key, num, begin, end)), Some(num))
}
}
stream.print()
env.execute("Boom")
}
}

和 Storm API 比较

从&#8221;让人类更轻松&#8221;这个角度而言,Flink API 很好地优化了我上面提到的几个问题,当然与此同时
你也失去了更精细化控制一些东西的能力,不过我认为就大部分的日常工作而言,让人类更轻松才是
我们应该追求的目标,利用精细控制来特别优化程序,很可能只在少数非常重要的业务上

其实可能也并没有差那么多

Storm最近的版本增加了WindowedBolt和Watermark功能,如果配合Storm一直有的Trident API
的话,应该是可以用很接近Flink API的方式写出一样的逻辑的,虽然Trident API不明原因地
似乎一直不怎么流行.

事实上,我觉得主流的流计算框架API都会变得越来越像.甚至大部分不同的框架会支持同一套API,
比如现在 Apache Beam
就同时有spark和flink的实现,我觉得最后的发展,还是要看每个框架的砸钱社区发展做得好不好.


推荐阅读
  • POJ 2482 星空中的星星:利用线段树与扫描线算法解决
    在《POJ 2482 星空中的星星》问题中,通过运用线段树和扫描线算法,可以高效地解决星星在窗口内的计数问题。该方法不仅能够快速处理大规模数据,还能确保时间复杂度的最优性,适用于各种复杂的星空模拟场景。 ... [详细]
  • 在尝试对从复杂 XSD 生成的类进行序列化时,遇到了 `NullReferenceException` 错误。尽管已经花费了数小时进行调试和搜索相关资料,但仍然无法找到问题的根源。希望社区能够提供一些指导和建议,帮助解决这一难题。 ... [详细]
  • Unity3D 中 AsyncOperation 实现异步场景加载及进度显示优化技巧
    在Unity3D中,通过使用`AsyncOperation`可以实现高效的异步场景加载,并结合进度条显示来提升用户体验。本文详细介绍了如何利用`AsyncOperation`进行异步加载,并提供了优化技巧,包括进度条的动态更新和加载过程中的性能优化方法。此外,还探讨了如何处理加载过程中可能出现的异常情况,确保加载过程的稳定性和可靠性。 ... [详细]
  • 本文深入解析了WCF Binding模型中的绑定元素,详细介绍了信道、信道管理器、信道监听器和信道工厂的概念与作用。从对象创建的角度来看,信道管理器负责信道的生成。具体而言,客户端的信道通过信道工厂进行实例化,而服务端则通过信道监听器来接收请求。文章还探讨了这些组件之间的交互机制及其在WCF通信中的重要性。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 本文探讨了如何利用Java代码获取当前本地操作系统中正在运行的进程列表及其详细信息。通过引入必要的包和类,开发者可以轻松地实现这一功能,为系统监控和管理提供有力支持。示例代码展示了具体实现方法,适用于需要了解系统进程状态的开发人员。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 在Android应用开发中,实现与MySQL数据库的连接是一项重要的技术任务。本文详细介绍了Android连接MySQL数据库的操作流程和技术要点。首先,Android平台提供了SQLiteOpenHelper类作为数据库辅助工具,用于创建或打开数据库。开发者可以通过继承并扩展该类,实现对数据库的初始化和版本管理。此外,文章还探讨了使用第三方库如Retrofit或Volley进行网络请求,以及如何通过JSON格式交换数据,确保与MySQL服务器的高效通信。 ... [详细]
  • 技术日志:使用 Ruby 爬虫抓取拉勾网职位数据并生成词云分析报告
    技术日志:使用 Ruby 爬虫抓取拉勾网职位数据并生成词云分析报告 ... [详细]
  • 在Android 4.4系统中,通过使用 `Intent` 对象并设置动作 `ACTION_GET_CONTENT` 或 `ACTION_OPEN_DOCUMENT`,可以从相册中选择图片并获取其路径。具体实现时,需要为 `Intent` 添加相应的类别,并处理返回的 Uri 以提取图片的文件路径。此方法适用于需要从用户相册中选择图片的应用场景,能够确保兼容性和用户体验。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
  • 图像拼接技术深入解析:基于OpenCV 3.4的Stitching模块源码分析(下篇)
    本文继续深入探讨图像拼接技术,特别是在OpenCV 3.4的Stitching模块中的源码实现。通过与VLFeat的SIFT实现进行对比,详细分析了OpenCV在图像特征提取、匹配及拼接过程中的关键算法和技术细节,为读者提供了全面的技术解析和实践指导。 ... [详细]
  • 在ElasticStack日志监控系统中,Logstash编码插件自5.0版本起进行了重大改进。插件被独立拆分为gem包,每个插件可以单独进行更新和维护,无需依赖Logstash的整体升级。这不仅提高了系统的灵活性和可维护性,还简化了插件的管理和部署过程。本文将详细介绍这些编码插件的功能、配置方法,并通过实际生产环境中的应用案例,展示其在日志处理和监控中的高效性和可靠性。 ... [详细]
  • 数字图书馆近期展出了一批精选的Linux经典著作,这些书籍虽然部分较为陈旧,但依然具有重要的参考价值。如需转载相关内容,请务必注明来源:小文论坛(http://www.xiaowenbbs.com)。 ... [详细]
author-avatar
我是你的特效
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有