Slick 3.0 (scala) queries don't return data ti

2019-09-10 09:23发布

问题:

So I'm very (extremely) new to Databases and slick and scala, so I was using the example code from their documentation at http://slick.typesafe.com/doc/3.0.0/gettingstarted.html

My problem is that for some reason, I have to run a query multiple times before it returns data. I have to rerun it atleast 3-4 times before it returns results. I use a for-loop to rerun the query and they don't necessarily give me the exact same results each time either.

to create two tables as followed:

  class Patients(tag: Tag) extends Table[(String, String, Int, String)](tag, "Patientss") {
    def PID = column[String]("Patient Id", O.PrimaryKey)

    def Gender = column[String]("Gender")

    def Age = column[Int]("Age")

    def Ethnicity = column[String]("Ethnicity")

    def * = (PID, Gender, Age, Ethnicity)
  }

  val patientsss = TableQuery[Patients]

  class DrugEffect(tag: Tag) extends Table[(String, String, Double)](tag, "DrugEffectss") {

    def DrugID = column[String]("Drug ID", O.PrimaryKey)

    def PatientID = column[String]("Patient_ID")

    def DrugEffectssss = column[Double]("Drug Effect")

    def * = (DrugID, PatientID, DrugEffectssss)

    def Patient = foreignKey("Patient_FK", PatientID, patientsss)(_.PID)}

 val d_effects = TableQuery[DrugEffect]

I then create these tables using

  val create_empty = DBIO.seq((patientsss.schema ++ d_effects.schema).create)
  val setup_1 = db.run(create_empty)

I have actual data in two text files, which I parse through using a buffered reader. I store all the Drug ID's in a list creatively named DrugIds

Then, I start filling in the tables in the following way

I first fill in the Patients table:

   while (switch != 1) {

    val Patient = CurPatient.split("\\s+")

    if (Patient(2).toUpperCase() == "NA" || (Patient(2).toFloat % 1 != 0))
      age = -1
    else age = Patient(2).toInt

    val insertPatient: DBIO[Option[Int]] = patientsss ++= Seq(
      (Patient(0), Patient(1), age, Patient(3))
    )       

    var future = db.run(insertPatient)



    CurPatient = PatientReader.readLine()

    if (CurPatient == null)
      switch = 1 //switch to 1

  }

For the DrugEffects table, I do the following:

 while (switch != 1) {

    val Effect = CurEffect.split("\\s+")

    for (i <- 1 until DrugIds.size - 1) {
      if (Effect(i).toUpperCase() == "NA")
        d_ef = -1.00

      else d_ef = (Effect(i).toFloat).asInstanceOf[Double]


      val insertEffect: DBIO[Option[Int]] = d_effects ++= Seq(
        (DrugIds(i), Effect(0), d_ef)
      )
      var future2 = db.run(insertEffect)
    }


    CurEffect = EffectReader.readLine()

    if (CurEffect == null)
      switch = 1
  }

Then I run a query with the following piece of code

val q1 = for {
    c <- patientsss
  } yield (c.PID, c.Gender, c.Age, c.Ethnicity)

  db.stream(q1.result).foreach(println)

This should just give me all the data in the Patient's table, but it doesn't necessarily do that.

Sometimes, I get the following error (but not always):

java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon$3@47089c2c rejected from java.util.concurrent.ThreadPoolExecutor@6453123[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 215]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
at slick.backend.DatabaseComponent$DatabaseDef$class.scheduleSynchronousStreaming(DatabaseComponent.scala:253)
at slick.jdbc.JdbcBackend$DatabaseDef.scheduleSynchronousStreaming(JdbcBackend.scala:38)
at slick.backend.DatabaseComponent$BasicStreamingActionContext.restartStreaming(DatabaseComponent.scala:516)
at slick.backend.DatabaseComponent$BasicStreamingActionContext.request(DatabaseComponent.scala:531)
at slick.backend.DatabasePublisher$$anon$3$$anonfun$onNext$2.apply(DatabasePublisher.scala:50)
at slick.backend.DatabasePublisher$$anon$3$$anonfun$onNext$2.apply(DatabasePublisher.scala:49)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

If I run a more complex query, the data I get back is accurate to the parameters of the query, but the same problems occur, which is that the results are either duplicated or non-existent or not complete (when I rerun the query multiple times).

Explain like I'm 5 if you can, or point me to a resource that can help me solve these problems

EDIT:

bjfletcher's answer worked (Thanks!), but now I have another problem:

Every now and again, the code will fail with the error:

    Exception in thread "main" org.h2.jdbc.JdbcSQLException: Table "Patientss" not found; SQL statement:
insert into "Patientss" ("Patient Id","Gender","Age","Ethnicity")  values (?,?,?,?) [42102-162]
    at org.h2.message.DbException.getJdbcSQLException(DbException.java:329)
    at org.h2.message.DbException.get(DbException.java:169)
    at org.h2.message.DbException.get(DbException.java:146)
    at org.h2.command.Parser.readTableOrView(Parser.java:4758)
    at org.h2.command.Parser.readTableOrView(Parser.java:4736)
    at org.h2.command.Parser.parseInsert(Parser.java:954)
    at org.h2.command.Parser.parsePrepared(Parser.java:375)
    at org.h2.command.Parser.parse(Parser.java:279)
    at org.h2.command.Parser.parse(Parser.java:251)
    at org.h2.command.Parser.prepareCommand(Parser.java:217)
    at org.h2.engine.Session.prepareLocal(Session.java:415)
    at org.h2.engine.Session.prepareCommand(Session.java:364)
    at org.h2.jdbc.JdbcConnection.prepareCommand(JdbcConnection.java:1121)
    at org.h2.jdbc.JdbcPreparedStatement.<init>(JdbcPreparedStatement.java:71)
    at org.h2.jdbc.JdbcConnection.prepareStatement(JdbcConnection.java:267)
    at slick.jdbc.JdbcBackend$SessionDef$class.prepareStatement(JdbcBackend.scala:252)
    at slick.jdbc.JdbcBackend$BaseSession.prepareStatement(JdbcBackend.scala:386)
    at slick.jdbc.JdbcBackend$SessionDef$class.withPreparedStatement(JdbcBackend.scala:301)
    at slick.jdbc.JdbcBackend$BaseSession.withPreparedStatement(JdbcBackend.scala:386)
    at slick.driver.JdbcInsertInvokerComponent$BaseInsertInvoker.preparedInsert(JdbcInsertInvokerComponent.scala:177)
    at slick.driver.JdbcInsertInvokerComponent$BaseInsertInvoker$$anonfun$internalInsertAll$1.apply(JdbcInsertInvokerComponent.scala:201)
    at slick.jdbc.JdbcBackend$BaseSession.withTransaction(JdbcBackend.scala:422)
    at slick.driver.JdbcInsertInvokerComponent$BaseInsertInvoker.internalInsertAll(JdbcInsertInvokerComponent.scala:198)
    at slick.driver.JdbcInsertInvokerComponent$BaseInsertInvoker.insertAll(JdbcInsertInvokerComponent.scala:194)
    at slick.driver.JdbcInsertInvokerComponent$InsertInvokerDef$class.$plus$plus$eq(JdbcInsertInvokerComponent.scala:73)
    at slick.driver.JdbcInsertInvokerComponent$BaseInsertInvoker.$plus$plus$eq(JdbcInsertInvokerComponent.scala:152)
    at slick.driver.JdbcActionComponent$InsertActionComposerImpl$$anonfun$$plus$plus$eq$1.apply(JdbcActionComponent.scala:459)
    at slick.driver.JdbcActionComponent$InsertActionComposerImpl$$anonfun$$plus$plus$eq$1.apply(JdbcActionComponent.scala:459)
    at slick.driver.JdbcActionComponent$InsertActionComposerImpl$$anon$8.run(JdbcActionComponent.scala:449)
    at slick.driver.JdbcActionComponent$InsertActionComposerImpl$$anon$8.run(JdbcActionComponent.scala:447)
    at slick.backend.DatabaseComponent$DatabaseDef$$anon$2.liftedTree1$1(DatabaseComponent.scala:231)
    at slick.backend.DatabaseComponent$DatabaseDef$$anon$2.run(DatabaseComponent.scala:231)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Doesn't happen all the time, but very often, and I have no clue what this means

回答1:

All the DB calls will return to you immediately with Futures, even if they've not finished with their operations. This is asynchronous not synchronous.

You can change your code to accommodate the Futures in one of two ways:

  1. you can use Await.result with all DB calls, to wait at that point until they complete, for example: Await.result(db.run(insertEffect), Duration.Inf)

  2. you can use .map (or .flatMap if you're using another Future from within), with code that you want to run when the DB operation is complete. For example: db.run(insertEffect).map(_ => ... do stuff... )

Have a look another Stack Overflow thread regarding the exception with some ideas as to the cause.