假设我需要在平行给定文件夹处理文件。 在Java中我会创造一个FolderReader
线程从文件夹和池读取文件名FileProcessor
线程。 FolderReader
读取文件的名称和提交的文件处理功能( Runnable
)到池中执行。
在斯卡拉我看到两个选项:
- 创建池
FileProcessor
演员和安排与文件处理功能Actors.Scheduler
。 - 为每个文件名创建一个演员在读取文件名。
是否有意义? 什么是最好的选择吗?
假设我需要在平行给定文件夹处理文件。 在Java中我会创造一个FolderReader
线程从文件夹和池读取文件名FileProcessor
线程。 FolderReader
读取文件的名称和提交的文件处理功能( Runnable
)到池中执行。
在斯卡拉我看到两个选项:
FileProcessor
演员和安排与文件处理功能Actors.Scheduler
。 是否有意义? 什么是最好的选择吗?
我用我所有的精力都建议尽可能你可以从线程保持。 幸运的是,我们有更好的抽象,其照顾什么下面发生的事情,和你的情况看来,我认为你不需要使用演员(而你可以),但你可以使用一个简单的抽象,称为期货。 他们是阿卡开源库的一部分,我认为在未来将是Scala的标准库的一部分。
-未来[T]很简单的东西,将在未来返回吨。
所有你需要运行一个未来,是有一个隐含的ExecutionContext,你可以从一个Java执行服务获得。 然后,你就可以享受优雅的API和一个事实,即未来转变成收藏品期货的集合,收集结果等单子。 我建议你给看看,以http://doc.akka.io/docs/akka/2.0.1/scala/futures.html
object TestingFutures {
implicit val executorService = Executors.newFixedThreadPool(20)
implicit val executorContext = ExecutionContext.fromExecutorService(executorService)
def testFutures(myList:List[String]):List[String]= {
val listOfFutures : Future[List[String]] = Future.traverse(myList){
aString => Future{
aString.reverse
}
}
val result:List[String] = Await.result(listOfFutures,1 minute)
result
}
}
有很多事情在这里:
Future.traverse
其接收作为其是第一参数M[T]<:Traversable[T]
和作为第二参数的T => Future[T]
或者如果你喜欢一个Function1[T,Future[T]]
并返回未来[M [T]] Future.apply
方法创建匿名类类型的Future[T]
还有许多其他的原因看阿卡期货。
期货可以映射,因为它们是单子,即你可以链期货执行:
Future { 3 }.map { _ * 2 }.map { _.toString }
期货有所回调:future.onComplete,的onSuccess,onFailure处,andThen等。
期货不仅支持移动,但也为理解
根据你在做什么,它可能是那样简单
for(file<-files.par){
//process the file
}
理想情况下,你应该使用两个演员。 一个用于读取文件的列表,以及一个用于实际读取文件。
您只需发送一个“开始”消息,第一男主角启动进程。 然后,演员可以读取文件的列表,并发送消息到第二男主角。 第二个演员,然后读取该文件并处理的内容。
拥有多个角色,这可能看起来很复杂,其实是在你有一堆与海誓山盟通信,就像在一个理论OO系统对象的感觉是一件好事。
编辑:你真的不应该做这样一个文件的并发读取。
我正想写了什么@ Edmondo1984做,除了他打我给它。 :)我第二次他的一大途径建议。 我还会建议你阅读的文档阿卡2.0.2 。 同时,我给你一个稍微具体的例子:
import akka.dispatch.{ExecutionContext, Future, Await}
import akka.util.duration._
import java.util.concurrent.Executors
import java.io.File
val execService = Executors.newCachedThreadPool()
implicit val execContext = ExecutionContext.fromExecutorService(execService)
val tmp = new File("/tmp/")
val files = tmp.listFiles()
val workers = files.map { f =>
Future {
f.getAbsolutePath()
}
}.toSeq
val result = Future.sequence(workers)
result.onSuccess {
case filenames =>
filenames.foreach { fn =>
println(fn)
}
}
// Artificial just to make things work for the example
Thread.sleep(100)
execContext.shutdown()
这里我用sequence
,而不是traverse
,但不同的是要依赖于你的需求。
去与未来,我的朋友; 该演员就是在这种情况下更痛苦的方式。
但是,如果使用的演员,有什么不好呢?
如果我们要读/写的一些属性文件。 还有就是我的Java示例。 但仍与阿卡演员。
免得说,我们有一个演员ActorFile
代表一个文件。 嗯..也许这并不能代表一个文件。 对? (将是很好的它可以)。 现在,它代表了一些文件,如PropertyFilesActor
则:
为什么不使用这样的:
public class PropertyFilesActor extends UntypedActor {
Map<String, String> filesContent = new LinkedHashMap<String, String>();
{ // here we should use real files of cource
filesContent.put("file1.xml", "");
filesContent.put("file2.xml", "");
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof WriteMessage) {
WriteMessage writeMessage = (WriteMessage) message;
String content = filesContent.get(writeMessage.fileName);
String newContent = content + writeMessage.stringToWrite;
filesContent.put(writeMessage.fileName, newContent);
}
else if (message instanceof ReadMessage) {
ReadMessage readMessage = (ReadMessage) message;
String currentContent = filesContent.get(readMessage.fileName);
// Send the current content back to the sender
getSender().tell(new ReadMessage(readMessage.fileName, currentContent), getSelf());
}
else unhandled(message);
}
}
...消息将与参数去(文件名)
它有自己in-box
,接受邮件喜欢:
这些信息将被存储到所述in-box
的顺序,一个后antoher。 存储/读取,同时发送反馈-演员会从盒子接收消息完成其工作sender ! message
sender ! message
回来了。
因此,我们如果我们写的属性文件,并发送出网页上的内容说。 我们可以开始展示页面(右后我们发送的邮件存储数据的文件),只要我们收到的反馈,与数据从刚刚更新文件(AJAX)更新页面的一部分。
好了,抓住你的文件,并在并行结构把它们粘
scala> new java.io.File("/tmp").listFiles.par
res0: scala.collection.parallel.mutable.ParArray[java.io.File] = ParArray( ... )
然后...
scala> res0 map (_.length)
res1: scala.collection.parallel.mutable.ParArray[Long] = ParArray(4943, 1960, 4208, 103266, 363 ... )