Scala的并行文件处理(Parallel file processing in Scala)

2019-06-26 10:43发布

假设我需要在平行给定文件夹处理文件。 在Java中我会创造一个FolderReader线程从文件夹和池读取文件名FileProcessor线程。 FolderReader读取文件的名称和提交的文件处理功能( Runnable )到池中执行。

在斯卡拉我看到两个选项:

  • 创建池FileProcessor演员和安排与文件处理功能Actors.Scheduler
  • 为每个文件名创建一个演员在读取文件名。

是否有意义? 什么是最好的选择吗?

Answer 1:

我用我所有的精力都建议尽可能你可以从线程保持。 幸运的是,我们有更好的抽象,其照顾什么下面发生的事情,和你的情况看来,我认为你不需要使用演员(而你可以),但你可以使用一个简单的抽象,称为期货。 他们是阿卡开源库的一部分,我认为在未来将是Scala的标准库的一部分。

-未来[T]很简单的东西,将在未来返回吨。

所有你需要运行一个未来,是有一个隐含的ExecutionContext,你可以从一个Java执行服务获得。 然后,你就可以享受优雅的API和一个事实,即未来转变成收藏品期货的集合,收集结果等单子。 我建议你给看看,以http://doc.akka.io/docs/akka/2.0.1/scala/futures.html

object TestingFutures {
  implicit val executorService = Executors.newFixedThreadPool(20)
  implicit val executorContext = ExecutionContext.fromExecutorService(executorService)

  def testFutures(myList:List[String]):List[String]= {

    val listOfFutures : Future[List[String]] = Future.traverse(myList){
      aString => Future{
                        aString.reverse
                       }
     }
    val result:List[String] = Await.result(listOfFutures,1 minute)
    result

  }
}

有很多事情在这里:

  • 我使用Future.traverse其接收作为其是第一参数M[T]<:Traversable[T]和作为第二参数的T => Future[T]或者如果你喜欢一个Function1[T,Future[T]]并返回未来[M [T]]
  • 我使用的Future.apply方法创建匿名类类型的Future[T]

还有许多其他的原因看阿卡期货。

  • 期货可以映射,因为它们是单子,即你可以链期货执行:

    Future { 3 }.map { _ * 2 }.map { _.toString }

  • 期货有所回调:future.onComplete,的onSuccess,onFailure处,andThen等。

  • 期货不仅支持移动,但也为理解



Answer 2:

根据你在做什么,它可能是那样简单

for(file<-files.par){
   //process the file
}


Answer 3:

理想情况下,你应该使用两个演员。 一个用于读取文件的列表,以及一个用于实际读取文件。

您只需发送一个“开始”消息,第一男主角启动进程。 然后,演员可以读取文件的列表,并发送消息到第二男主角。 第二个演员,然后读取该文件并处理的内容。

拥有多个角色,这可能看起来很复杂,其实是在你有一堆与海誓山盟通信,就像在一个理论OO系统对象的感觉是一件好事。

编辑:你真的不应该做这样一个文件的并发读取。



Answer 4:

我正想写了什么@ Edmondo1984做,除了他打我给它。 :)我第二次他的一大途径建议。 我还会建议你阅读的文档阿卡2.0.2 。 同时,我给你一个稍微具体的例子:

import akka.dispatch.{ExecutionContext, Future, Await}
import akka.util.duration._
import java.util.concurrent.Executors
import java.io.File

val execService = Executors.newCachedThreadPool()
implicit val execContext = ExecutionContext.fromExecutorService(execService)

val tmp = new File("/tmp/")
val files = tmp.listFiles()
val workers = files.map { f =>
  Future {
    f.getAbsolutePath()
  }
}.toSeq
val result = Future.sequence(workers)
result.onSuccess {
  case filenames =>
    filenames.foreach { fn =>
      println(fn)
    }
}

// Artificial just to make things work for the example
Thread.sleep(100)
execContext.shutdown()

这里我用sequence ,而不是traverse ,但不同的是要依赖于你的需求。

去与未来,我的朋友; 该演员就是在这种情况下更痛苦的方式。




Answer 5:

但是,如果使用的演员,有什么不好呢?

如果我们要读/写的一些属性文件。 还有就是我的Java示例。 但仍与阿卡演员。

免得说,我们有一个演员ActorFile代表一个文件。 嗯..也许这并不能代表一个文件。 对? (将是很好的它可以)。 现在,它代表了一些文件,如PropertyFilesActor则:

为什么不使用这样的:

public class PropertyFilesActor extends UntypedActor {

    Map<String, String> filesContent = new LinkedHashMap<String, String>();

    { // here we should use real files of cource
        filesContent.put("file1.xml", "");
        filesContent.put("file2.xml", "");
    }

    @Override
    public void onReceive(Object message) throws Exception {

        if (message instanceof WriteMessage)  {
            WriteMessage writeMessage = (WriteMessage) message;
            String content = filesContent.get(writeMessage.fileName);
            String newContent = content + writeMessage.stringToWrite;
            filesContent.put(writeMessage.fileName, newContent);
        }

        else if (message instanceof ReadMessage) {
            ReadMessage readMessage = (ReadMessage) message;
            String currentContent = filesContent.get(readMessage.fileName);
            // Send the current content back to the sender
            getSender().tell(new ReadMessage(readMessage.fileName, currentContent), getSelf());
        }

        else unhandled(message);

    }

}

...消息将与参数去(文件名)

它有自己in-box ,接受邮件喜欢:

  1. 的WriteLine(文件名,字符串)
  2. 的ReadLine(文件名,字符串)

这些信息将被存储到所述in-box的顺序,一个后antoher。 存储/读取,同时发送反馈-演员会从盒子接收消息完成其工作sender ! message sender ! message回来了。

因此,我们如果我们写的属性文件,并发送出网页上的内容说。 我们可以开始展示页面(右后我们发送的邮件存储数据的文件),只要我们收到的反馈,与数据从刚刚更新文件(AJAX)更新页面的一部分。



Answer 6:

好了,抓住你的文件,并在并行结构把它们粘

scala> new java.io.File("/tmp").listFiles.par
res0: scala.collection.parallel.mutable.ParArray[java.io.File] = ParArray( ... )

然后...

scala> res0 map (_.length)
res1: scala.collection.parallel.mutable.ParArray[Long] = ParArray(4943, 1960, 4208, 103266, 363 ... )


文章来源: Parallel file processing in Scala
标签: scala akka actor