在Play Scala中使用Iteratees和Enumerators将数据流式传输到S3

 无敌鸟的秋天 发布于 2022-12-04 04:04

我正在Scala中构建一个Play Framework应用程序,我想将一个字节数组流式传输到S3.我正在使用Play-S3库来执行此操作.文档部分的"Multipart文件上传"与此处相关:

// Retrieve an upload ticket
val result:Future[BucketFileUploadTicket] =
  bucket initiateMultipartUpload BucketFile(fileName, mimeType)

// Upload the parts and save the tickets
val result:Future[BucketFilePartUploadTicket] =
  bucket uploadPart (uploadTicket, BucketFilePart(partNumber, content))

// Complete the upload using both the upload ticket and the part upload tickets
val result:Future[Unit] =
  bucket completeMultipartUpload (uploadTicket, partUploadTickets)

我试图在我的应用程序中做同样的事情,但用Iteratees和Enumerators.

流和异步性使事情变得有点复杂,但这是我到目前为止(注意uploadTicket在代码的前面定义):

val partNumberStream = Stream.iterate(1)(_ + 1).iterator
val partUploadTicketsIteratee = Iteratee.fold[Array[Byte], Future[Vector[BucketFilePartUploadTicket]]](Future.successful(Vector.empty[BucketFilePartUploadTicket])) { (partUploadTickets, bytes) =>
  bucket.uploadPart(uploadTicket, BucketFilePart(partNumberStream.next(), bytes)).flatMap(partUploadTicket => partUploadTickets.map( _ :+ partUploadTicket))
}
(body |>>> partUploadTicketsIteratee).andThen {
  case result =>
    result.map(_.map(partUploadTickets => bucket.completeMultipartUpload(uploadTicket, partUploadTickets))) match {
      case Success(x) => x.map(d => println("Success"))
      case Failure(t) => throw t
    }
}

一切都编译和运行没有事故.事实上,"Success"得到打印,但没有文件出现在S3上.

1 个回答
  • 您的代码可能存在多个问题.由map方法调用引起的有点不可读.您的未来构图可能有问题.另一个问题可能是由于所有块(除了最后一块)至少应为5MB.

    下面的代码尚未经过测试,但显示了不同的方法.iteratee方法是一种可以创建小构建块并将它们组合成一个操作管道的方法.

    为了使代码编译,我添加了一个特征和一些方法

    trait BucketFilePartUploadTicket
    val uploadPart: (Int, Array[Byte]) => Future[BucketFilePartUploadTicket] = ???
    val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ???
    val body: Enumerator[Array[Byte]] = ???
    

    这里我们创建几个部分

    // Create 5MB chunks
    val chunked = {
      val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5)
      Enumeratee.grouped(take5MB transform Iteratee.consume())
    }
    
    // Add a counter, used as part number later on
    val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) {
      case ((counter, _), bytes) => (counter + 1) -> bytes
    }
    
    // Map the (Int, Array[Byte]) tuple to a BucketFilePartUploadTicket
    val uploadPartTickets = Enumeratee.mapM[(Int, Array[Byte])](uploadPart.tupled)
    
    // Construct the pipe to connect to the enumerator
    // the ><> operator is an alias for compose, it is more intuitive because of 
    // it's arrow like structure
    val pipe = chunked ><> zipWithIndex ><> uploadPartTickets
    
    // Create a consumer that ends by finishing the upload
    val consumeAndComplete = 
      Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload
    

    只需连接部件即可完成运行

    // This is the result, a Future[Unit]
    val result = body through pipe run consumeAndComplete 
    

    请注意,我没有测试任何代码,可能在我的方法中犯了一些错误.然而,这显示了处理问题的不同方式,并且应该可以帮助您找到一个好的解决方案.

    请注意,此方法在进入下一部分之前等待一部分完成上载.如果从服务器到亚马逊的连接比从浏览器到服务器的连接慢,则此机制将减慢输入.

    你可以采取另一种方法,你不要等待Future零件上传完成.这将导致您使用另一个步骤Future.sequence将上传期货序列转换为包含结果序列的单个未来.结果将是一旦有足够的数据就将一个部件发送到亚马逊的机制.

    2022-12-11 03:11 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有