def trainBestSeller(events: RDD[BuyEvent], n: Int, itemStringIntMap: BiMap[String, Int]): Map[String, Array[(Int, Int)]] = { val itemTemp = events // map item from string to integer index .flatMap { case BuyEvent(user, item, category, count) if itemStringIntMap.contains(item) => Some((itemStringIntMap(item),category),count) case _ => None } // cache to use for next times .cache()
// top view with each category:
val bestSeller_Category: Map[String, Array[(Int, Int)]] = itemTemp.reduceByKey(_ + _)
.map(row => (row._1._2, (row._1._1, row._2)))
.groupByKey
.map { case (c, itemCounts) =>
(c, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n))
}
.collectAsMap.toMap
// top view with all category => cateogory ALL
val bestSeller_All: Map[String, Array[(Int, Int)]] = itemTemp.reduceByKey(_ + _)
.map(row => ("ALL", (row._1._1, row._2)))
.groupByKey
.map {
case (c, itemCounts) =>
(c, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n))
}
.collectAsMap.toMap
// merge 2 map bestSeller_All and bestSeller_Category
val bestSeller = bestSeller_Category ++ bestSeller_All
bestSeller
}