In Slick's documentation examples for using Reactive Streams are presented just for reading data as a means of a DatabasePublisher. But what happens when you want to use your database as a Sink and backpreasure based on your insertion rate?
I've looked for equivalent DatabaseSubscriber but it doesn't exist. So the question is, if I have a Source, say:
val source = Source(0 to 100)
how can I crete a Sink with Slick that writes those values into a table with schema:
create table NumberTable (value INT)
Serial Inserts
The easiest way would be to do inserts within a Sink.foreach
.
Assuming you've used the schema code generation and further assuming your table is named "NumberTable"
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
We can write a function that does the insertion
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
And that function can be placed in the Sink
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
Batched Inserts
You could further extend the Sink methodology by batching N inserts at a time:
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
This batched Sink can be fed by a Flow
which does the batch grouping:
val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)
Although you can use a Sink.foreach
to achieve this (as mentioned by Ramon) it is safer and likely faster (by running the inserts in parallel) to use the mapAsync
Flow
. The problem you will face with using Sink.foreach
is that it does not have a return value. Inserting into a database via slicks db.run
method returns a Future
which will then escape out of the steams returned Future[Done]
which completes as soon as the Sink.foreach
finishes.
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
def value = column[Int]("value")
def * = value
}
val numbers = TableQuery[Numbers]
val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)
val streamFuture: Future[Done] = Source(0 to 100)
.runWith(Sink.foreach[Int] { (i: Int) =>
db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
})
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")
//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done
On the other hand the def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])
Flow
allows you to run the inserts in parallel via the parallelism paramerter and accepts a function from the upstream out value to a future of some type. This matches our i => db.run(numbers += i)
function. The great thing about this Flow
is that it then feeds the result of these Futures
downstream.
val streamFuture2: Future[Done] = Source(0 to 100)
.mapAsync(1) { (i: Int) =>
db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
}
.runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")
//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done <-- stream Future[Done] returned after inserts finished
To prove the point you can even return a real result from the stream rather than a Future[Done]
(With Done representing Unit). This stream will also add a higher parallelism value and batching for extra performance. *
val streamFuture3: Future[Int] = Source(0 to 100)
.via(Flow[Int].grouped(10)) // Batch in size 10
.mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
.runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")
// sample 3 output:
// stream 3 done, inserted 101 rows
- Note: You probably won't see better performance for such a small data set, but when I was dealing with a 1.7M insert I was able to get the best performance on my machine with a batch size of 1000 and parallelism value of 8, locally with postgresql. This was about twice as good as not running in parallel. As always when dealing with performance your results may vary and you should measure for yourself.