Clojure core.async. How to download lazily with go

2019-07-19 02:06发布

问题:

It`s a continuation of my previous question How to produce a lazy sequence by portion in clojure?

I want download data from a database by portions. Initially I download first 500 rows and then I send a request to fetch next 500 rows and so on until I receive all data from a server.

I wrote the code:

(jdbc/atomic conn
 (with-open [cursor (jdbc/fetch-lazy conn [sql_query])]
   (let [lazyseq (jdbc/cursor->lazyseq cursor)
         counter (atom 1)]
     (swap! lazyseq_maps assoc :session_id {:get_next? (chan 1) :over_500 (chan 1) :data []})
     (>!! (:get_next? (:session_id @lazyseq_maps)) true)
     (go
       (doseq [row lazyseq]
         (swap! counter inc)
         (when (<! (:get_next? (:session_id @lazyseq_maps)))
           (swap! lazyseq_maps update-in [:session_id :data] conj row)
           (if (not= 0 (mod @counter 500))
             (>! (:get_next? (:session_id @lazyseq_maps)) true)
             (>! (:over_500 (:session_id @lazyseq_maps)) true))))
        ;
        (close! (:get_next? (:session_id @lazyseq_maps)))
        (close! (:over_500 (:session_id @lazyseq_maps)))
        (.close conn))

     (when (<!! (:over_500 (:session_id @lazyseq_maps))) {:message "over 500 rows"
                                                          :id :session_id
                                                          :data (:data (:session_id @lazyseq_maps))}))))

I fetch rows with help of the doseq cycle. When doseq passed 500 rows I park the cycle (when (<! (:get_next? (:session_id @lazyseq_maps))) and wait for a signal from outside to retrieve next 500 rows.

But here I have a problem. When I send the signal, the program throws error "Resultset is closed". I.e connection is closed outside with-open scope. But I don`t understand why, because go block is placed inside with-open scope. Can you help me solve the problem?

回答1:

(go ...) returns immediately, therefore, so does (with-open ...).

You may want to do it the other way around:

(go (with-open ...))

However, do note that this process will hold on to a database connection (a scarce resource!) for a potentially very long time, which may not be desirable, and kind of goes against the benefit of having 'lightweight' threads thanks to go blocks. Here are some alternatives to consider:

  • Maybe you could re-open a database connection for each batch?
  • Maybe you could eagerly stream the whole results set to an external store (e.g AWS S3) and have the client poll against that?


回答2:

Unless you are on a seriously memory constrained system, I would recommend just load all rows at once to RAM and close the DB connection. Otherwise your complete solution will likely be very complex and difficult to test and reason about.

If you have tens of millions of rows maybe you could fetch them in some partitions?