It`s a continuation of my previous question How to produce a lazy sequence by portion in clojure?
I want download data from a database by portions. Initially I download first 500 rows and then I send a request to fetch next 500 rows and so on until I receive all data from a server.
I wrote the code:
(jdbc/atomic conn
(with-open [cursor (jdbc/fetch-lazy conn [sql_query])]
(let [lazyseq (jdbc/cursor->lazyseq cursor)
counter (atom 1)]
(swap! lazyseq_maps assoc :session_id {:get_next? (chan 1) :over_500 (chan 1) :data []})
(>!! (:get_next? (:session_id @lazyseq_maps)) true)
(go
(doseq [row lazyseq]
(swap! counter inc)
(when (<! (:get_next? (:session_id @lazyseq_maps)))
(swap! lazyseq_maps update-in [:session_id :data] conj row)
(if (not= 0 (mod @counter 500))
(>! (:get_next? (:session_id @lazyseq_maps)) true)
(>! (:over_500 (:session_id @lazyseq_maps)) true))))
;
(close! (:get_next? (:session_id @lazyseq_maps)))
(close! (:over_500 (:session_id @lazyseq_maps)))
(.close conn))
(when (<!! (:over_500 (:session_id @lazyseq_maps))) {:message "over 500 rows"
:id :session_id
:data (:data (:session_id @lazyseq_maps))}))))
I fetch rows with help of the doseq cycle. When doseq passed 500 rows I park the cycle (when (<! (:get_next? (:session_id @lazyseq_maps)))
and wait for a signal from outside to retrieve next 500 rows.
But here I have a problem. When I send the signal, the program throws error "Resultset is closed". I.e connection is closed outside with-open scope. But I don`t understand why, because go block is placed inside with-open scope. Can you help me solve the problem?
Unless you are on a seriously memory constrained system, I would recommend just load all rows at once to RAM and close the DB connection. Otherwise your complete solution will likely be very complex and difficult to test and reason about.
If you have tens of millions of rows maybe you could fetch them in some partitions?
(go ...)
returns immediately, therefore, so does(with-open ...)
.You may want to do it the other way around:
(go (with-open ...))
However, do note that this process will hold on to a database connection (a scarce resource!) for a potentially very long time, which may not be desirable, and kind of goes against the benefit of having 'lightweight' threads thanks to
go
blocks. Here are some alternatives to consider: