玩2.X:无文件上传与Iteratees(Play 2.x : Reactive file uplo

2019-06-27 00:15发布

我将这个问题开始: 如何使用Scala的API的Iteratee将文件上传到云存储 (Azure的Blob存储在我的情况,但我不认为这是现在最重要的)

背景:

我需要大块的投入大约1 MB块存储大型媒体文件(300 MB +)作为Azure的BlockBlobs 。 不幸的是,我的斯卡拉知识仍然很差(我的项目是基于Java和斯卡拉在它唯一的用途将是一个上传控制器)。

我试着用这个代码: 为什么叫使得错误或在BodyParser的Iteratee完成请求挂在游戏框架2.0? (作为Input Iteratee ) -它工作得很好,但每个Element ,我可以用具有8192个字节大小,所以它的发送几百MB的文件到云太小。

我必须说,这是一个相当新的方法给我,并极有可能我误解的东西(不想告诉我误解一切;>)

我欢迎任何提示或链接,这将有助于我与该主题。 如果有类似的使用的任何样品这将是我的想法是最好的选择。

Answer 1:

基本上你需要首先什么是rechunk输入作为更大的块,1024个* 1024字节。

首先,让我们有一个Iteratee ,将消耗多达字节的100万(确定有最后一块小)

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()

利用这一点,我们可以构造一个Enumeratee (适配器),将重新集结块,使用称为分组的API:

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

这里编组使用一个Iteratee以确定有多少就摆在每个块。 它采用了我们的consumeAMB了点。 这意味着结果是一个Enumeratee该rechunks输入到Array[Byte] 1MB的。

现在,我们需要写BodyParser ,它将使用Iteratee.foldM方法发送的字节每块:

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

foldM沿着传递的状态,并用它在其传递函数(S,Input[Array[Byte]]) => Future[S]返回状态的新前景。 直到foldM不会再次调用该函数Future完成,有输入的可用块。

与身体解析器将rechunking输入,将其推进店:

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

返回一个右表明您被人体解析(这恰好是这里的处理程序)结束返回身体。



Answer 2:

如果你的目标是流式传输到S3,在这里,我已经实现并测试一个帮手:

def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]])
                (implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = {
  import scala.collection.JavaConversions._

  val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  val initResponse = s3.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped {
    Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume()
  }

  val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) { case (etags, bytes) =>
    val uploadRequest = new UploadPartRequest()
      .withBucketName(bucket)
      .withKey(key)
      .withPartNumber(etags.length + 1)
      .withUploadId(uploadId)
      .withInputStream(new ByteArrayInputStream(bytes))
      .withPartSize(bytes.length)

    val etag = Future { s3.uploadPart(uploadRequest).getPartETag }
    etag.map(etags :+ _)
  }

  val futETags = enum &> rechunker |>>> uploader

  futETags.map { etags =>
    val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
    s3.completeMultipartUpload(compRequest)
  }.recoverWith { case e: Exception =>
    s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
    Future.failed(e)
  }

}


Answer 3:

以下添加到您的配置文件

play.http.parser.maxMemoryBuffer = 256K



Answer 4:

对于那些谁也试图,而不是写一个全新的BodyParser找出此流问题的解决方案,你也可以用什么已经在实施parse.multipartFormData 。 您可以实现类似下面覆盖默认的处理程序handleFilePartAsTemporaryFile。

def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = {
  handleFilePart {
    case FileInfo(partName, filename, contentType) =>

      (rechunkAdapter &>> writeToS3).map {
        _ =>
          val compRequest = new CompleteMultipartUploadRequest(...)
          amazonS3Client.completeMultipartUpload(compRequest)
          ...
      }
  }
}

def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)

我能够做这个工作,但我现在还不能确定整个上传过程是否流。 我尝试了一些大的文件,似乎S3仅上载启动时,整个文件已经从客户端发送。

我看着上面的解析器实现,我觉得一切都使用Iteratee连接,应将文件流。 如果有人有这方面的一些见解,那将是非常有益的。



文章来源: Play 2.x : Reactive file upload with Iteratees