Making Clojure transactions speed up

2019-08-16 13:03发布

I have two implementations of a customer flight booking in Clojure. The first one is sequential and the second one is my attempt at parallelizing it. My understanding is that the parallel implementation should be faster. The sequential implementation uses atoms while the parallel implementation uses refs. The input is just two collections-a collection of flights and a collection of customers. Booking process involves adjusting the flights as per customer budgets and destinations. There is also a sales process that adjusts the flight prices. Note the sales process, as well as customer and flight collections, are the same for both implementations.

Below is the sequential implementation.

(ns flight-reservation
  (:require [clojure.string]
            [clojure.pprint]
            #_[input-simple :as input]
            [input-random :as input]))

(def logger (agent nil))
(defn log [& msgs] (send logger (fn [_] (apply println msgs))))
;(defn log [& msgs] nil)

(def flights
  "All flights are encapsulated in a single atom in this implementation.
  You are free to change this to a more appropriate mechanism."
  (atom []))

(defn initialize-flights [initial-flights]
  "Set `flights` atom to the `initial-flights`."
  (reset! flights initial-flights))

(defn print-flights [flights]
  "Print `flights`."
  (letfn [(pricing->str [pricing]
            (->> pricing
              (map (fn [[p a t]] (clojure.pprint/cl-format nil "$~3d: ~3d ~3d" p a t)))
              (clojure.string/join ", ")))]
    (doseq [{:keys [id from to pricing]} flights]
      (println (clojure.pprint/cl-format nil "Flight ~3d from ~a to ~a: ~a"
        id from to (pricing->str pricing))))))

(defn- update-pricing [flight factor]
  "Updated pricing of `flight` with `factor`."
  (update flight :pricing
    #(map (fn [[p a t]] [(* p factor) a t]) %)))

(defn start-sale [flight-ids]
  "Sale: -20% on `flight-ids`."
  (log "Start sale for flights" flight-ids)
  (swap! flights
    (fn [old-flights]
      (vec (map
             (fn [flight]
               (if (contains? flight-ids (:id flight))
                 (update-pricing flight 0.80)
                 flight))
             old-flights)))))

(defn end-sale [flight-ids]
  "End sale: +25% (inverse of -20%) on `flight-ids`."
  (log "End sale")
  (swap! flights
    (fn [old-flights]
      (vec (map
             (fn [flight]
               (if (contains? flight-ids (:id flight))
                 (update-pricing flight 1.25)
                 flight))
             old-flights)))))

(defn sort-pricing [pricing]
  "Sort `pricing` from lowest to highest price."
  (sort-by first pricing))

(defn filter-pricing-with-n-seats [pricing seats]
  "Get `pricing` for which there are at least `seats` empty seats available."
  (filter #(>= (second %) seats) pricing))

(defn lowest-available-price [flight seats]
  "Returns the lowest price in `flight` for which at least `seats` empty seats
  are available, or nil if none found."
  (-> (:pricing flight)                 ; [[price available taken]]
    (filter-pricing-with-n-seats seats)
    (sort-pricing)
    (first)                             ; [price available taken]
    (first)))                           ; price

(defn- find-flight [flights customer]
  "Find a flight in `flights` that is on the route and within the budget of
  `customer`. If a flight was found, returns {:flight flight :price price},
  else returns nil."
  (let [{:keys [_id from to seats budget]}
          customer
        flights-and-prices
          ; flights that are on the route and within budget, and their price
          (for [f flights
                :when (and (= (:from f) from) (= (:to f) to))
                :let [lowest-price (lowest-available-price f seats)]
                :when (and (some? lowest-price) (<= lowest-price budget))]
            {:flight f :price lowest-price})
        cheapest-flight-and-price
          (first (sort-by :price flights-and-prices))]
    cheapest-flight-and-price))

(defn- book [flight price seats]
  "Updates `flight` to book `seats` at `price`."
  (update flight :pricing
    (fn [pricing]
      (for [[p a t] pricing]
        (if (= p price)
          [p (- a seats) (+ t seats)]
          [p a t])))))

(defn- process-customer [flights customer]
  "Try to book a flight from `flights` for `customer`, returning the updated
  flight if found, or nil if no suitable flight was found."
  (if-let [{:keys [flight price]} (find-flight flights customer)]
    (let [updated-flight (book flight price (:seats customer))]
      (log "Customer" (:id customer) "booked" (:seats customer)
        "seats on flight" (:id updated-flight) "at $" price " (< budget of $"
        (:budget customer) ").")
      updated-flight)
    (do
      (log "Customer" (:id customer) "did not find a flight.")
      nil)))

(def finished-processing?
  "Set to true once all customers have been processed, so that sales process
  can end."
  (atom false))

(defn process-customers [customers]
  (Thread/sleep 100)
  "Process `customers` one by one."
  (doseq [customer customers]
    (swap! flights
      (fn [flights]
        (if-let [updated-flight (process-customer flights customer)]
          (assoc flights (:id updated-flight) updated-flight)
          flights))))
  (reset! finished-processing? true))

(defn sales-process []
  "The sales process starts and ends sales periods, until `finished-processing?`
  is true."
  (loop []
    (let [discounted-flight-ids (->> input/flights
                                     (map :id)
                                     shuffle
                                     (take input/NUMBER_OF_DISCOUNTED_FLIGHTS)
                                     set)]
      (Thread/sleep input/TIME_BETWEEN_SALES)
      (start-sale discounted-flight-ids)
      (Thread/sleep input/TIME_OF_SALES)
      (end-sale discounted-flight-ids))
    (if (not @finished-processing?)
      (recur))))

(defn main []
  (initialize-flights input/flights)
  (let [f1 (future (time (process-customers input/customers)))
        f2 (future (sales-process))]
    @f1
    @f2)
  (println "Flights:")
  (print-flights @flights))

(main)
(shutdown-agents)

And below is the parallel implementation:

(ns flight-reservation
  (:require [clojure.string]
            [clojure.pprint]
            #_[input-simple :as input]
            [input-random :as input]))
(def N-THREADS 32) 

(def logger (agent nil))
(defn log [& msgs] (send logger (fn [_] (apply println msgs))))
;(defn log [& msgs] nil)

(def flights
  "All flights are encapsulated in a single atom in this implementation.
  You are free to change this to a more appropriate mechanism."
  (ref []))

(defn initialize-flights [initial-flights]
  "Set `flights` atom to the `initial-flights`."
  (dosync(ref-set flights initial-flights)))

(defn print-flights [flights]
  "Print `flights`."
  (letfn [(pricing->str [pricing]
            (->> pricing
              (map (fn [[p a t]] (clojure.pprint/cl-format nil "$~3d: ~3d ~3d" p a t)))
              (clojure.string/join ", ")))]
    (doseq [{:keys [id from to pricing]} flights]
      (println (clojure.pprint/cl-format nil "Flight ~3d from ~a to ~a: ~a"
        id from to (pricing->str pricing))))))

(defn- update-pricing [flight factor]
  "Updated pricing of `flight` with `factor`."
  (update flight :pricing
    #(map (fn [[p a t]] [(* p factor) a t]) %)))

(defn start-sale [flight-ids]
  "Sale: -20% on `flight-ids`."
  (log "Start sale for flights" flight-ids)
  (dosync
  (alter flights
    (fn [old-flights]
      (vec (pmap
             (fn [flight]
               (if (contains? flight-ids (:id flight))
                 (update-pricing flight 0.80)
                 flight))
             old-flights))))))

(defn end-sale [flight-ids]
  "End sale: +25% (inverse of -20%) on `flight-ids`."
  (log "End sale")
  (dosync
  (alter flights
    (fn [old-flights]
      (vec (pmap
             (fn [flight]
               (if (contains? flight-ids (:id flight))
                 (update-pricing flight 1.25)
                 flight))
             old-flights))))))

(defn sort-pricing [pricing]
  "Sort `pricing` from lowest to highest price."
  (sort-by first pricing))

(defn filter-pricing-with-n-seats [pricing seats]
  "Get `pricing` for which there are at least `seats` empty seats available."
  (filter #(>= (second %) seats) pricing))

(defn lowest-available-price [flight seats]
  "Returns the lowest price in `flight` for which at least `seats` empty seats
  are available, or nil if none found."
  (-> (:pricing flight)                 ; [[price available taken]]
    (filter-pricing-with-n-seats seats)
    (sort-pricing)
    (first)                             ; [price available taken]
    (first)))                           ; price

(defn- find-flight [flights customer]
  "Find a flight in `flights` that is on the route and within the budget of
  `customer`. If a flight was found, returns {:flight flight :price price},
  else returns nil."
  (let [{:keys [_id from to seats budget]}
          customer
        flights-and-prices
          ; flights that are on the route and within budget, and their price
          (for [f flights
                :when (and (= (:from f) from) (= (:to f) to))
                :let [lowest-price (lowest-available-price f seats)]
                :when (and (some? lowest-price) (<= lowest-price budget))]
            {:flight f :price lowest-price})
        cheapest-flight-and-price
          (first (sort-by :price flights-and-prices))]
    cheapest-flight-and-price))

(defn- book [flight price seats]
  "Updates `flight` to book `seats` at `price`."
  (update flight :pricing
    (fn [pricing]
      (for [[p a t] pricing]
        (if (= p price)
          [p (- a seats) (+ t seats)]
          [p a t])))))

(defn- process-customer [flights customer]
  "Try to book a flight from `flights` for `customer`, returning the updated
  flight if found, or nil if no suitable flight was found."
  (if-let [{:keys [flight price]} (find-flight flights customer)]
    (let [updated-flight (book flight price (:seats customer))]
      (log "Customer" (:id customer) "booked" (:seats customer)
        "seats on flight" (:id updated-flight) "at $" price " (< budget of $"
        (:budget customer) ").")
      updated-flight)
    (do
      (log "Customer" (:id customer) "did not find a flight.")
      nil)))

(def finished-processing?
  "Set to true once all customers have been processed, so that sales process
  can end."
  (atom false))

(defn process-customers [customers]
  "Process `customers` one by one."
  (Thread/sleep 100)
  (dosync
  (doseq [customer customers]

    (alter flights
      (fn [flights]
        (if-let [updated-flight (process-customer flights customer)]
          (assoc flights (:id updated-flight) updated-flight)
          flights)))))
  (reset! finished-processing? true))

(defn sales-process []
  "The sales process starts and ends sales periods, until `finished-processing?`
  is true."
  (loop []
    (let [discounted-flight-ids (->> input/flights
                                     (pmap :id)
                                     shuffle
                                     (take input/NUMBER_OF_DISCOUNTED_FLIGHTS)
                                     set)]
      (Thread/sleep input/TIME_BETWEEN_SALES)
      (start-sale discounted-flight-ids)
      (Thread/sleep input/TIME_OF_SALES)
      (end-sale discounted-flight-ids)
    (if (not @finished-processing?)
      (recur)))))

(defn partitionCustomerInput 
  [threads customers]
  (let [partitions (partition-all 
     (Math/ceil (/ (count customers) threads))  customers)]
        partitions))

(defn main []
  (initialize-flights input/flights)
  (let [f1 (time (doall (pmap process-customers (partitionCustomerInput N-THREADS input/customers))))
        f2 (future (sales-process))]






    @f2)
  (println "Flights:")
  (print-flights @flights))

(main)
(shutdown-agents)

The sequential implementation is faster than the parallel implementation. I do not understand why as in the parallel implementation, inside main, I parallelize the customer processing with a pmap after splitting the customer collection into partitions equal to the number of desired threads.

2条回答
forever°为你锁心
2楼-- · 2019-08-16 13:21

You will not see an speed improvement if the function that you want to run in parallel is trivial:

(->> input/flights
     (pmap :id) ;; map or pmap here
     ...)

In that case, the cost of the function to be performed in parallel (returning the value for the id key) is much smaller than the cost of dispatching function calls in parallel (and assembling the result back), so it's no surprise the sequential version is faster.

See this example:

user=> (time (count (map inc (range 1000))))
"Elapsed time: 0.391237 msecs"
1000
user=> (time (count (pmap inc (range 1000))))
"Elapsed time: 9.451128 msecs"
1000

A trivial operation (inc) is not an expensive computation that would benefit from being performed in parallel. I think the same idea applies to the other uses of pmap in your parallel version of the code.

You might find this example the usage of pmap on Clojuredocs useful.

查看更多
相关推荐>>
3楼-- · 2019-08-16 13:22

The problem with parallelism and pmap is it still guarantees return value ordering. This means that if you have a pmap pool of 8 cores/workers, and one blocks, the whole batch will be blocked for that time.

Take a look at Claypoole's unordered pmap 'upmap' where return value ordering is dropped in favor of faster return values. Here one blocking operation will still leave 7 cores for the faster operations.

查看更多
登录 后发表回答