In Slick's documentation examples for using Reactive Streams are presented just for reading data as a means of a DatabasePublisher. But what happens when you want to use your database as a Sink and backpreasure based on your insertion rate?
I've looked for equivalent DatabaseSubscriber but it doesn't exist. So the question is, if I have a Source, say:
val source = Source(0 to 100)
how can I crete a Sink with Slick that writes those values into a table with schema:
create table NumberTable (value INT)
Serial Inserts
The easiest way would be to do inserts within a
Sink.foreach
.Assuming you've used the schema code generation and further assuming your table is named "NumberTable"
We can write a function that does the insertion
And that function can be placed in the Sink
Batched Inserts
You could further extend the Sink methodology by batching N inserts at a time:
This batched Sink can be fed by a
Flow
which does the batch grouping:Although you can use a
Sink.foreach
to achieve this (as mentioned by Ramon) it is safer and likely faster (by running the inserts in parallel) to use themapAsync
Flow
. The problem you will face with usingSink.foreach
is that it does not have a return value. Inserting into a database via slicksdb.run
method returns aFuture
which will then escape out of the steams returnedFuture[Done]
which completes as soon as theSink.foreach
finishes.On the other hand the
def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])
Flow
allows you to run the inserts in parallel via the parallelism paramerter and accepts a function from the upstream out value to a future of some type. This matches ouri => db.run(numbers += i)
function. The great thing about thisFlow
is that it then feeds the result of theseFutures
downstream.To prove the point you can even return a real result from the stream rather than a
Future[Done]
(With Done representing Unit). This stream will also add a higher parallelism value and batching for extra performance. *