Socket connection and ActorSystem

2019-08-09 00:26发布

问题:

I have an application, that uses akka and now I want to connect to it via a socket connection. Therefor I use a machanism similar to the one from the scala page. But if I try to tell, while I have an open OutputStream, no message is received by the target.

Here is my source code:

object Connector {

  def main(args: Array[String]) {
    val port = 1337
    val conf = ConfigFactory.load
    val system = ActorSystem("SDDB", conf.getConfig("SDDB"))
    val master = system.actorOf(Props[TestActor])
    master ! "a"

    try {
      val listener = new ServerSocket(port)
      println("listening on port: " + port)
      while (true)
        new ConnectionThread(listener accept, master).start
      listener close
    } catch {
      case e: IOException =>
        System.err.println("Could not listen on port: " + port + ".")
        System.exit(-1)
    } finally {
      system.shutdown
    }
  }
}

case class ConnectionThread(socket: Socket, master: ActorRef) 
  extends Thread("ConnectionThread") {

  private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r
  private implicit var id = 0L
  private implicit val timeout = Timeout(25.0 seconds)

  master ! "b"

  override def run {
    master ! "c"
    try{
      master ! "d"
      val in = new ObjectInputStream(socket getInputStream)
      master ! "e"
      val out = new ObjectOutputStream(socket getOutputStream)

      out writeObject("listening")
      out flush

      master ! "f"
      val command = in.readObject.asInstanceOf[String]
      println("client sent: '" + command + "'")
      // process the command

      master ! "g"
      out.writeObject("EOF")
      out.flush

      out.close
      in.close
      socket.close
    } catch {
      case e: SocketException =>
      case e: IOException => e printStackTrace
    }
  }
}

class TestActor extends Actor with ActorLogging{

  log info("TestActor running")

  def receive = {
    case s: String =>
      log info("received: " + s)
  }

}

I get the output:

listening on port: 1337
[INFO] TestActor running
[INFO] received: a
[INFO] received: b
[INFO] received: c
[INFO] received: d

Now I expected it to go on until g, but instead I get:

client sent: 'select content from testdata on 2012-07-06'

I figured out that it works until I open a Stream of the socket, probably because tell and ask are socketbased as well and use the outputstream of the socket, the tread runs in. Afterwards the socket connection works, but I am not able to send any message to the actor-system.
There is no way for me to drop the Connector and the ConnectionThread. How can I fix it?

回答1:

I must admit, that I did not completly understood the example from the documentation. But I figured out that using a ConnectionHelper instead of directly addressing the ActorRef works pretty good.
I changed my code to the following:

object Connector {

  def main(args: Array[String]) {
    val port = 1337
    val conf = ConfigFactory.load
    val system = ActorSystem("SDDB", conf.getConfig("SDDB"))

    //    val master = system.actorOf(Props[TestActor], "master")
    //    master ! "a"

    try {
      val listener = new ServerSocket(port)
      println("listening on port: " + port)
      while (true)
      //        new ConnectionThread(listener accept, master.asInstanceOf[TestActor]).start
        new ConnectionThread(listener accept, system).start
      listener close
    } catch {
      case e: IOException =>
        System.err.println("Could not listen on port: " + port + ".")
        System.exit(-1)
    } finally {
      //      master ! PoisonPill
      system.shutdown
    }
  }

}

case class ConnectionThread(socket: Socket, sys: ActorSystem) 
  extends Thread("ConnectionThread") {

  private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r
  private implicit var id = 0L
  private implicit val timeout = Timeout(25.0 seconds)
  private val conHelper = new ConnectionHelper

  override def run {
    try {
      val out = new ObjectOutputStream(socket getOutputStream)
      val in = new ObjectInputStream(socket getInputStream)

      conHelper tell "funzt"
      out writeObject ("Hi")
      out.flush
      val command = in.readObject.asInstanceOf[String]
      println("received: " + command)
      out writeObject ("test")
      out.flush
      out writeObject ("EOF")
      out.flush

      out.close
      in.close
      socket.close
    }
  }

  private class ConnectionHelper {
    val tester = sys.actorOf(Props[TestActor])

    def tell(s: String) { tester ! s }

  }

}

I don't really understand why this works and the code from my question does not. I welcome all explanations.