热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FunDA(12)-示范:强类型数据源-strongtypeddatasources

FunDA设计的主要目的是解决FRM(FunctionalRelationMapping)如Slick这样的批次型操作工具库数据源行间游动操作的缺失问题。FRM产生的结果集就是一种静态集合

    FunDA设计的主要目的是解决FRM(Functional Relation Mapping)如Slick这样的批次型操作工具库数据源行间游动操作的缺失问题。FRM产生的结果集就是一种静态集合,缺乏动态更新操作方式。FunDA提出的解决方案是把FRM产生的静态集合转变成动态流(stream),流内元素代表数据行(data row),一个完整的数据流代表一连串的数据行。用户可以利用数据流和FunDA提供的函数组件在数据流中游动进行数据更新操作。FunDA的数据流只支持单向游动(fda_next),但FunDA的数据流支持多种类型的数据元素,包括:数据行(data row)和指令行(action row)。指令行ActionRow是由Slick-DBIOAction构成,可以发送回后台数据库更新数据。FunDA可以通过函数组件从数据行中产生新数据行或者指令行并且在数据流的任何位置运算用户提供的功能函数,使其能使用该位置的数据行进行数据更新或者数据(指令)行产生操作。我们将在下面几个章节进行FunDA功能的使用示范。

    Slick运算Query返回的结果集合内的数据行类型一般是Tuple类型。因为无法使用字段名,是弱类型。除了从方便使用角度考虑,还因为FunDA开发是基于Scala函数式编程模式的,静态类型系统(static type system)对类型要求比较严格,所以FunDA的数据流内元素必须是强类型的,大部分是case class类型。这样用户可以使用名称来调用数据字段来进行数据处理编程。下面我们就示范一下如何把Slick的数据结果集合转变成强类型数据流:

从世界银行公开数据网站下载了一份美国州县空气质量报告原始数据,cvs格式的,30万条左右。导入h2数据库后作为示范数据。下面是示范数据表结构:

import slick.driver.H2Driver.api._

object Models {

//表字段对应模版
case class AQMRawModel(mid: String
, mtype: String
, state: String
, fips: String
, county: String
, year: String
, value: String)

//表结构: 定义字段类型, * 代表结果集字段
class AQMRawTable(tag: Tag) extends Table[AQMRawModel](tag, "AIRQM") {
def mid = column[String]("MEASUREID")
def mtype = column[String]("MEASURETYPE")
def state = column[String]("STATENAME")
def fips = column[String]("COUNTYFIPS")
def county = column[String]("COUNTYNAME")
def year = column[String]("REPORTYEAR")
def value = column[String]("VALUE")

def * = (mid,mtype,state,fips,county,year,value) <> (AQMRawModel.tupled, AQMRawModel.unapply)
}

//库表实例
val AQMRawQuery = TableQuery[AQMRawTable]

}

下面是这个示范软件的sbt设置文件build.sbt:

name := "funda-demo"

version := "1.0"

scalaVersion := "2.11.8"

resolvers += Resolver.mavenLocal

libraryDependencies ++= Seq(
"com.typesafe.slick" %% "slick" % "3.1.1",
"com.typesafe.slick" %% "slick-testkit" % "3.1.1" % "test",
"org.slf4j" % "slf4j-nop" % "1.7.21",
"com.h2database" % "h2" % "1.4.191",
"com.typesafe.slick" %% "slick-hikaricp" % "3.1.1",
"com.bayakala" % "funda_2.11" % "1.0.0-SNAPSHOT" withSources() withJavadoc()
)

数据库设置在前面Slick系列讨论里已经示范过了。在这里就不再多说了。

强类型转换可以在读取数据库时进行,生成强类型元素的数据流。或者在使用数据流时即时转换。我们先看看如何构建强类型元素数据流:

  val aqmraw = Models.AQMRawQuery

val db = Database.forConfig("h2db")
// aqmQuery.result returns Seq[(String,String,String,String)]
val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
// user designed strong typed resultset type. must extend FDAROW
case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
// strong typed resultset conversion function. declared implicit to remind during compilation
implicit def toTypedRow(row: (String,String,String,String)): TypedRow =
TypedRow(row._1,row._2,row._3,row._4)


在读取数据库前用户提供强类型结构case class TypedRow, 及Seq[(...)]到TypeRow类型转换函数toTypedRow,如上。在构建数据读取工具类FDAViewLoader时提供这个转换函数:

// loader to read from database and convert result collection to strong typed collection
val viewLoader = FDAViewLoader(slick.driver.H2Driver)(toTypedRow _)
val dataSeq = viewLoader.fda_typedRows(aqmQuery.result)(db).toSeq

现在这个dataSeq是Seq[TypedRow]类型了。用dataSeq构建静态数据流:

// turn Seq collection into fs2 stream
val aqmStream = fda_staticSource(dataSeq)()()

fd_staticSource是基于bracket函数的资源使用模式:

  /**
* produce a static view source from a Seq[ROW] collection using famous 'bracket'
* provide facade to error handling and cleanup
* @param acquirer the Seq[ROW] collection
* @param errhandler error handle callback
* @param finalizer cleanup callback
* @tparam ROW type of row
* @return a new stream
*/
def fda_staticSource[ROW](acquirer: => Seq[ROW])(
errhandler: Throwable => FDAPipeLine[ROW] = null)(
finalizer: => Unit = ()): FDAPipeLine[ROW] = {...}

上面的调用省略了异常和事后处理。下面的例子示范了完整的调用:

  val safeSource = fda_staticSource(dataSeq) {
case e: Exception => fda_appendRow(FDAErrorRow(new Exception(e)))
}(println("the end finally!"))

在这个调用例子里如果出现异常,新数据流状态是一个代表异常的元素类型。无论在正常完成或中断情况下都会显示“the end finally!“信息。

aqmStream是个一个以TypedRow为元素的强类型数据流。我们可以在组件函数里使用字段名:

  // use stream combinators with field names
aqmStream.filter{r => r.year > "1999"}.take(3).appendTask(showRecord).startRun

当然我们也可以在用户定义的任务FDAUserTask函数中调用字段名:

// now access fields in the strong typed resultset
def showRecord: FDAUserTask[FDAROW] = row => {
row match {
case qmr: TypedRow =>
println(s"州名: ${qmr.state}")
println(s"县名:${qmr.county}")
println(s"年份:${qmr.year}")
println(s"取值:${qmr.value}")
println("-------------")
fda_skip
case _ => fda_skip
}
}

运算aqmStream得出以下结果:

州名: Ohio
县名:Stark
年份:2013
取值:0
-------------
州名: New Mexico
县名:Lea
年份:2002
取值:0
-------------
州名: Texas
县名:Bowie
年份:2003
取值:0
-------------

Process finished with exit code 0

我们也可以先构建一个弱类型数据流后再用map来把它转换成强类型的,如下:

  val allState = aqmraw.map(_.state)
val stateLoader = FDAViewLoader[String,String](slick.driver.H2Driver)()
val stateSeq = stateLoader.fda_plainRows(allState.distinct.result)(db).toSeq
val stateStream = fda_staticSource(stateSeq)()()
case class StateRow(state: String) extends FDAROW
def showState: FDAUserTask[FDAROW] = row => {
row match {
case StateRow(sname) =>
println(s"州名称:$sname")
fda_skip
case _ => fda_skip
}
}
stateStream.map{s => StateRow(s)}
.filter{r => r.state > "Alabama"}.take(3)
.appendTask(showState).startRun

allState返回结果类型Seq[String]。注意如果没有提供类型转换函数来辅助类型推导就必须在构建FDAViewLoader时提供SOURCE和TARGET类型参数。stateStream是一个弱类型的数据流,我们用map{s => StateRow(s))把流元素转换成StateRow类型。运算stateStream结果为:

州名称:North Dakota
州名称:Maryland
州名称:Louisiana

Process finished with exit code 0

上面的示范例子我们可以用Reactive-Streams方式实现,如下:

  val streamLoader = FDAStreamLoader(slick.driver.H2Driver)(toTypedRow _)
val streamSource = streamLoader.fda_typedStream(aqmQuery.result)(db)(
10.seconds,512,512)()()
streamSource.filter{r => r.year > "1999"}.take(3).appendTask(showRecord).startRun

val stateStreamLoader = FDAStreamLoader[String,String](slick.driver.H2Driver)()
val stateStreamSource = stateStreamLoader.fda_plainStream(allState.distinct.result)(db)(
10.seconds,512,512)()()

//first convert to StateRows to turn Stream[Task,FDAROW] typed stream
stateStreamSource.map{s => StateRow(s)}
.filter{r => r.state > "Alabama"}.take(3)
.appendTask(showState).startRun
}

fda_typeStream产生强类型元素的数据流。它的函数款式是这样的:

   /**
* returns a reactive-stream from Slick DBIOAction result
* using play-iteratees and fs2 queque to connect to slick data stream publisher
* provide facade for error handler and finalizer to support exception and cleanup handling
* also provide stream element conversion from SOURCE type to TARGET type
* @param action a Slick DBIOAction to produce query results
* @param slickDB Slick database object
* @param maxInterval max time wait on iteratee to consume of next element
* exceeding presumed streaming failure or completion
* use 0.milli to represent infinity
* inform enumerator to release its resources
* @param fetchSize number of rows cached during database read
* @param queSize size of queque used by iteratee as cache to pass elements to fs2 stream
* @param errhandler error handler callback
* @param finalizer cleanup callback
* @param convert just a measure to guarantee conversion function is defined
* when this function is used there has to be a converter defined
* implicitly in compile time
* @return a reactive-stream of TARGET row type elements
*/
def fda_typedStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(
slickDB: Database)(
maxInterval: FiniteDuration, fetchSize: Int, queSize: Int)(
errhandler: Throwable => FDAPipeLine[TARGET] = null)(
finalizer: => Unit = ())(
implicit convert: SOURCE => TARGET): FDAPipeLine[TARGET] = {...}

注意maxInterval,fetchSize,queSize这几个参数的用途。上面这个streaming的示范例子产生相同结果。

下面是示范源代码:

import slick.driver.H2Driver.api._
import com.bayakala.funda._
import API._
import scala.language.implicitConversions
import scala.concurrent.duration._

object StrongTypedSource extends App {

val aqmraw = Models.AQMRawQuery

val db = Database.forConfig("h2db")
// aqmQuery.result returns Seq[(String,String,String,String)]
val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
// user designed strong typed resultset type. must extend FDAROW
case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
// strong typed resultset conversion function. declared implicit to remind during compilation
implicit def toTypedRow(row: (String,String,String,String)): TypedRow =
TypedRow(row._1,row._2,row._3,row._4)
// loader to read from database and convert result collection to strong typed collection
val viewLoader = FDAViewLoader(slick.driver.H2Driver)(toTypedRow _)
val dataSeq = viewLoader.fda_typedRows(aqmQuery.result)(db).toSeq
// turn Seq collection into fs2 stream
val aqmStream = fda_staticSource(dataSeq)()()
// now access fields in the strong typed resultset
def showRecord: FDAUserTask[FDAROW] = row => {
row match {
case qmr: TypedRow =>
println(s"州名: ${qmr.state}")
println(s"县名:${qmr.county}")
println(s"年份:${qmr.year}")
println(s"取值:${qmr.value}")
println("-------------")
fda_skip
case _ => fda_skip
}
}
// use stream combinators with field names
aqmStream.filter{r => r.year > "1999"}.take(3).appendTask(showRecord).startRun

val allState = aqmraw.map(_.state)
//no converter to help type inference. must provide type parameters explicitly
val stateLoader = FDAViewLoader[String,String](slick.driver.H2Driver)()
val stateSeq = stateLoader.fda_plainRows(allState.distinct.result)(db).toSeq
//constructed a Stream[Task,String]
val stateStream = fda_staticSource(stateSeq)()()
//strong typed row type. must extend FDAROW
case class StateRow(state: String) extends FDAROW
def showState: FDAUserTask[FDAROW] = row => {
row match {
case StateRow(sname) =>
println(s"州名称:$sname")
fda_skip
case _ => fda_skip
}
}
//first convert to StateRows to turn Stream[Task,FDAROW] typed stream
stateStream.map{s => StateRow(s)}
.filter{r => r.state > "Alabama"}.take(3)
.appendTask(showState).startRun


val streamLoader = FDAStreamLoader(slick.driver.H2Driver)(toTypedRow _)
val streamSource = streamLoader.fda_typedStream(aqmQuery.result)(db)(
10.seconds,512,512)()()
streamSource.filter{r => r.year > "1999"}.take(3).appendTask(showRecord).startRun

val stateStreamLoader = FDAStreamLoader[String,String](slick.driver.H2Driver)()
val stateStreamSource = stateStreamLoader.fda_plainStream(allState.distinct.result)(db)(
10.seconds,512,512)()()

//first convert to StateRows to turn Stream[Task,FDAROW] typed stream
stateStreamSource.map{s => StateRow(s)}
.filter{r => r.state > "Alabama"}.take(3)
.appendTask(showState).startRun
}




推荐阅读
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • IOS开发之短信发送与拨打电话的方法详解
    本文详细介绍了在IOS开发中实现短信发送和拨打电话的两种方式,一种是使用系统底层发送,虽然无法自定义短信内容和返回原应用,但是简单方便;另一种是使用第三方框架发送,需要导入MessageUI头文件,并遵守MFMessageComposeViewControllerDelegate协议,可以实现自定义短信内容和返回原应用的功能。 ... [详细]
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文讨论了在手机移动端如何使用HTML5和JavaScript实现视频上传并压缩视频质量,或者降低手机摄像头拍摄质量的问题。作者指出HTML5和JavaScript无法直接压缩视频,只能通过将视频传送到服务器端由后端进行压缩。对于控制相机拍摄质量,只有使用JAVA编写Android客户端才能实现压缩。此外,作者还解释了在交作业时使用zip格式压缩包导致CSS文件和图片音乐丢失的原因,并提供了解决方法。最后,作者还介绍了一个用于处理图片的类,可以实现图片剪裁处理和生成缩略图的功能。 ... [详细]
  • 树莓派语音控制的配置方法和步骤
    本文介绍了在树莓派上实现语音控制的配置方法和步骤。首先感谢博主Eoman的帮助,文章参考了他的内容。树莓派的配置需要通过sudo raspi-config进行,然后使用Eoman的控制方法,即安装wiringPi库并编写控制引脚的脚本。具体的安装步骤和脚本编写方法在文章中详细介绍。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 如何搭建Java开发环境并开发WinCE项目
    本文介绍了如何搭建Java开发环境并开发WinCE项目,包括搭建开发环境的步骤和获取SDK的几种方式。同时还解答了一些关于WinCE开发的常见问题。通过阅读本文,您将了解如何使用Java进行嵌入式开发,并能够顺利开发WinCE应用程序。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • 本文讨论了如何使用Web.Config进行自定义配置节的配置转换。作者提到,他将msbuild设置为详细模式,但转换却忽略了带有替换转换的自定义部分的存在。 ... [详细]
  • 本文介绍了PHP常量的定义和使用方法,包括常量的命名规则、大小写敏感性、全局范围和标量数据的限制。同时还提到了应尽量避免定义resource常量,并给出了使用define()函数定义常量的示例。 ... [详细]
author-avatar
米字格时光
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有