Scala - getting a callback when an external proces

2020-02-28 05:59发布

问题:

I'm looking to replace a lot of my perl with scala. One of the things I tend to do a lot is call binaries (usually compiled C++, but could be java, other perl scripts, q scripts, etc) provided to me by other teams at my company.

For example, to do some complex math, I'd start one of the foreign binaries, then pipe my inputs to it. I'd then listen to its stdout stream for results, and stderr stream for diagnostic messages. In perl, I'd do this using a POE::Wheel::Run widget. I've come up with something analogous (and much nicer) in scala, but I'd like to make it more robust. It's a little wrapper around a ProcessIO object. It looks like this:

class Exe(command: String, out: String => Unit, err: String => Unit) {

    import scala.sys.process._
    import scala.io._
    import java.io._
    import scala.concurrent._

    val inputStream = new SyncVar[OutputStream];

    val process = Process(command).run(
        new ProcessIO(
            stdin => inputStream.put(stdin),
            stdout => Source.fromInputStream(stdout).getLines.foreach(out),
            stderr => Source.fromInputStream(stderr).getLines.foreach(err)));

    def write(s: String): Unit = synchronized {
        inputStream.get.write((s + "\n").getBytes)
    }

    def close(): Unit = {
        inputStream.get.close
    }
}

I'd then use it like this:

val exe = new Exe("tr [a-z] [A-Z]", 
                  out => println("o: " + out), 
                  err => println("e: " + err))
exe.write("lower")
exe.close()

Which prints out:

o: LOWER

This gets me 90% there, but what would be nice would be to get a callback when the process exits. It may exit because I've closed the input stream and its internal loop stops, it may exit on its own, or it may exit because I've killed it. In the callback, it would be good to know why it stopped, and the exit code.

I'm at a bit of a loss as to how to go about this, any help would be appreciated (and any edits to the above code are, of course, welcome - I'm a bit of a noob).

I'm using 2.9.0.1

回答1:

You can wait for the end of a process calling exitValue. You might do that in a separate thread, in which the callback will happen. Maybe class Process could be pimped thus:

import scala.concurrent.ops.spawn
implicit def ProcessWithCallback(p: Process) {
  def whenTerminatedDo(callback: Int => Unit) = spawn{
    val exitValue = p.exitValue; callback(p)
  }
}

You could then use that in Exe as you like.

The Process class given by the JVM and wrapped by scala.sys.Process is really rather feable, it will be hard not to block a thread



回答2:

updated version using spawn to create a new thread that blocks and waits for the exit code

class Exe(command:String, out:String=>Unit, err:String=>Unit, onExit:Int=>Unit) {

    import scala.sys.process._
    import scala.io._
    import java.io._
    import scala.concurrent._
    import scala.concurrent.ops.spawn

    val inputStream = new SyncVar[OutputStream];

    val process = Process(command).run(
        new ProcessIO(
            stdin => inputStream.put(stdin),
            stdout => Source.fromInputStream(stdout).getLines.foreach(out),
            stderr => Source.fromInputStream(stderr).getLines.foreach(err)));

    spawn { onExit(process.exitValue()) }

    def write(s:String):Unit = synchronized {
        inputStream.get.write((s + "\n").getBytes)
    }

    def close():Unit = {
        inputStream.get.close
    }
}

can use like this

import java.util.concurrent.CountDownLatch

val latch = new CountDownLatch(1)

val exe = new Exe("tr [a-z] [A-Z]",
        out => println("o: " + out), 
        err => println("e: " + err), 
        code=> {println(code) ; latch.countDown() })
exe.write("lower") 
exe.close()

latch.await

prints

o: LOWER
0

thanks everyone!



回答3:

Have you considered spawning a new thread which will then call the blocking method process.exitValue()? You can then call your callback.



标签: scala