我将这个问题开始: 如何使用Scala的API的Iteratee
将文件上传到云存储 (Azure的Blob存储在我的情况,但我不认为这是现在最重要的)
背景:
我需要大块的投入大约1 MB块存储大型媒体文件(300 MB +)作为Azure的BlockBlobs
。 不幸的是,我的斯卡拉知识仍然很差(我的项目是基于Java和斯卡拉在它唯一的用途将是一个上传控制器)。
我试着用这个代码: 为什么叫使得错误或在BodyParser的Iteratee完成请求挂在游戏框架2.0? (作为Input
Iteratee
) -它工作得很好,但每个Element
,我可以用具有8192个字节大小,所以它的发送几百MB的文件到云太小。
我必须说,这是一个相当新的方法给我,并极有可能我误解的东西(不想告诉我误解一切;>)
我欢迎任何提示或链接,这将有助于我与该主题。 如果有类似的使用的任何样品这将是我的想法是最好的选择。
基本上你需要首先什么是rechunk输入作为更大的块,1024个* 1024字节。
首先,让我们有一个Iteratee
,将消耗多达字节的100万(确定有最后一块小)
val consumeAMB =
Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()
利用这一点,我们可以构造一个Enumeratee
(适配器),将重新集结块,使用称为分组的API:
val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
Enumeratee.grouped(consumeAMB)
这里编组使用一个Iteratee
以确定有多少就摆在每个块。 它采用了我们的consumeAMB了点。 这意味着结果是一个Enumeratee
该rechunks输入到Array[Byte]
1MB的。
现在,我们需要写BodyParser
,它将使用Iteratee.foldM
方法发送的字节每块:
val writeToStore: Iteratee[Array[Byte],_] =
Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) =>
// write bytes and return next handle, probable in a Future
}
foldM沿着传递的状态,并用它在其传递函数(S,Input[Array[Byte]]) => Future[S]
返回状态的新前景。 直到foldM不会再次调用该函数Future
完成,有输入的可用块。
与身体解析器将rechunking输入,将其推进店:
BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))
返回一个右表明您被人体解析(这恰好是这里的处理程序)结束返回身体。
如果你的目标是流式传输到S3,在这里,我已经实现并测试一个帮手:
def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]])
(implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = {
import scala.collection.JavaConversions._
val initRequest = new InitiateMultipartUploadRequest(bucket, key)
val initResponse = s3.initiateMultipartUpload(initRequest)
val uploadId = initResponse.getUploadId
val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped {
Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume()
}
val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) { case (etags, bytes) =>
val uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withPartNumber(etags.length + 1)
.withUploadId(uploadId)
.withInputStream(new ByteArrayInputStream(bytes))
.withPartSize(bytes.length)
val etag = Future { s3.uploadPart(uploadRequest).getPartETag }
etag.map(etags :+ _)
}
val futETags = enum &> rechunker |>>> uploader
futETags.map { etags =>
val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
s3.completeMultipartUpload(compRequest)
}.recoverWith { case e: Exception =>
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
Future.failed(e)
}
}
以下添加到您的配置文件
play.http.parser.maxMemoryBuffer = 256K
对于那些谁也试图,而不是写一个全新的BodyParser找出此流问题的解决方案,你也可以用什么已经在实施parse.multipartFormData 。 您可以实现类似下面覆盖默认的处理程序handleFilePartAsTemporaryFile。
def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = {
handleFilePart {
case FileInfo(partName, filename, contentType) =>
(rechunkAdapter &>> writeToS3).map {
_ =>
val compRequest = new CompleteMultipartUploadRequest(...)
amazonS3Client.completeMultipartUpload(compRequest)
...
}
}
}
def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)
我能够做这个工作,但我现在还不能确定整个上传过程是否流。 我尝试了一些大的文件,似乎S3仅上载启动时,整个文件已经从客户端发送。
我看着上面的解析器实现,我觉得一切都使用Iteratee连接,应将文件流。 如果有人有这方面的一些见解,那将是非常有益的。