Merging an async channel buffer

2019-07-27 20:02发布

I have a list of ISBNs that do a search to Amazon. I've already solved this sequentially so the task now is to implement concurrency. I tried it out using core.async. The problem I am having is after the searches are complete, I need to be able to merge all the books into a collection so that I can sort them by book rank. Since I am using one channel with a buffer size of 10 I'm not sure how to go about doing that. My approach may be totally wrong. Help is appreciated.

Here is the function for concurrency

(def book_channel (chan 10))

(defn concurrency_test [list_of_isbns]
      ([doseq [isbn list_of_isbns]
        (go(>! book_channel(get_title_and_rank_for_one_isbn(amazon_search isbn))))])
    )
)

get title:

(defn get_title_and_rank_for_one_isbn [amazon_report]
  (def book_title (get-in amazon_report [:items 0 :item-atributes :title]))
  (def sales_rank(get-in amazon_report [:items 0 :SalesRank]))
  (def book_isbn(get-in amazon_report [:items 0 :asin]))
  (reduce into [[book_title] [book_isbn] [sales_rank]]))

and the call:

(def list_of_isbns (split_isbns "src/clj_amazon/isbn_list.txt"))
(concurrency_test list_of_isbns)

2条回答
Explosion°爆炸
2楼-- · 2019-07-27 21:00

If your goal is to overlap all I/O bound tasks (e.g. amazon-search) and parallelize all CPU bound tasks (e.g. parsing the report, etc), then you should look into the pipeline functions. Here is some pseudo codes illustrating their usages:

(let [isbn>   (chan)
      report> (chan)
      out>    (chan)]

  ;; pipeline-async will take isbn from isbn> channel, invoke
  ;; amazon-search-async and pipe the result to report> channel.

  (pipeline-async 10  ;; up to 10 I/O bound requests
                  report>
                  amazon-search-asyn
                  isbn>)

  ;; pipeline will take report from report> channel and feed it
  ;; to the transducer (map get-title-and-rank-etc) for processing,
  ;; the processed report will be pushed to the out> channel.

  (pipeline (.. Runtime getRuntime availableProcessors)
            out>
            (map get-title-and-rank-etc)
            report>)

  ;; read isbn from file and push it to isbn> channel
  (->> "isbn_list.txt"
       io/reader
       line-seq
       (onto-chan isbn>))

  ;; take all report from out> channel and sort it by rank & title
  (sort-by (juxt :rank :title) (<!! (async/into [] out>))))
查看更多
等我变得足够好
3楼-- · 2019-07-27 21:02

You should be able to use

(async/reduce conj '() book-chan)

to create a collection of all items in the channel, but remember to close the channel because reduce will not return a result until the channel has been closed.

查看更多
登录 后发表回答