I have a list of ISBNs that do a search to Amazon. I've already solved this sequentially so the task now is to implement concurrency. I tried it out using core.async
. The problem I am having is after the searches are complete, I need to be able to merge all the books into a collection so that I can sort them by book rank. Since I am using one channel with a buffer size of 10 I'm not sure how to go about doing that. My approach may be totally wrong. Help is appreciated.
Here is the function for concurrency
(def book_channel (chan 10))
(defn concurrency_test [list_of_isbns]
([doseq [isbn list_of_isbns]
(go(>! book_channel(get_title_and_rank_for_one_isbn(amazon_search isbn))))])
)
)
get title:
(defn get_title_and_rank_for_one_isbn [amazon_report]
(def book_title (get-in amazon_report [:items 0 :item-atributes :title]))
(def sales_rank(get-in amazon_report [:items 0 :SalesRank]))
(def book_isbn(get-in amazon_report [:items 0 :asin]))
(reduce into [[book_title] [book_isbn] [sales_rank]]))
and the call:
(def list_of_isbns (split_isbns "src/clj_amazon/isbn_list.txt"))
(concurrency_test list_of_isbns)
If your goal is to overlap all I/O bound tasks (e.g. amazon-search) and parallelize all CPU bound tasks (e.g. parsing the report, etc), then you should look into the pipeline functions. Here is some pseudo codes illustrating their usages:
You should be able to use
to create a collection of all items in the channel, but remember to close the channel because reduce will not return a result until the channel has been closed.