I would like to sort a DataFrame based on a column with my own comparator. It is possible to do this in Spark SQL?
For example, let's suppose that I have a DataFrame registred as Table "MyTable" with a column "Day" which its type is "string":
id | Day
--------------------
1 | Fri
2 | Mon
3 | Sat
4 | Sun
5 | Thu
And I want to execute this query:
SELECT * FROM MyTable ORDER BY Day
I would like to order the column "Day" with my own comparator. I thought about using a UDF but I don't know if it is possible. Note that I really want to use my comparator in Sort/Order By operations. I don't want to convert String from column Day to Datetime or something similar.
In SparkSQL, you do not have a choice and need to use orderBy
with one or more column(s). With RDDs, you can use a custom java-like comparator if you feel like it. Indeed, here is the signature of the sortBy
method of an RDD
(cf the scaladoc of Spark 2.4):
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
This means that you can provide an Ordering
of your choice, which is exactly like a java Comparator
(Ordering
actually inherit from Comparator
).
For simplicity, let's say I want to sort by absolute value of a column 'x' (this can be done without a comparator, but let's assume I need to use a comparator). I start by defining my comparator on rows:
class RowOrdering extends Ordering[Row] {
def compare(x : Row, y : Row): Int = x.getAs[Int]("x").abs - y.getAs[Int]("x").abs
}
Now let's define data and sort it:
val df = Seq( (0, 1),(1, 2),(2, 4),(3, 7),(4, 1),(5, -1),(6, -2),
(7, 5),(8, 5), (9, 0), (10, -9)).toDF("id", "x")
val rdd = df.rdd.sortBy(identity)(new RowOrdering(), scala.reflect.classTag[Row])
val sorted_df = spark.createDataFrame(rdd, df.schema)
sorted_df.show
+---+---+
| id| x|
+---+---+
| 9| 0|
| 0| 1|
| 4| 1|
| 5| -1|
| 6| -2|
| 1| 2|
| 2| 4|
| 7| 5|
| 8| 5|
| 3| 7|
| 10| -9|
+---+---+
Another solution is to define an implicit ordering so that you don't need to provide it when sorting.
implicit val ord = new RowOrdering()
df.rdd.sortBy(identity)
Finally, note that df.rdd.sortBy(_.getAs[Int]("x").abs)
would achive the same result. Also, you can use tuple ordering to do more complex things such as order by absolute values, and if equal, put the positive values first:
df.rdd.sortBy(x => (x.getAs[Int]("x").abs, - x.getAs[Int]("x"))) //RDD
df.orderBy(abs($"x"), - $"x") //dataframe
This is general way of doing it with dataframe
val df = spark.sql("SELECT * FROM MyTable")
df.orderby("yourcolumn")
orderby docs
If your data is less (seems like your have week names only) and then you can collect as list and use scala sortWith function
The sortWith
function Sorts this sequence according to a comparison
function. it takes a comparator function and sort according to it.you
can provide your own custom comparison function.
Different Example than yours :
scala> case class Emp(id: Int, name: String, salary: Double)
defined class Emp
scala> val emp1 = Emp(1, "james", 13000.00)
emp1: Emp = Emp(1,james,13000.0)
scala> val emp2 = Emp(2, "michael", 12000.00)
emp2: Emp = Emp(2,michael,12000.0)
scala> val emp3 = Emp(3, "Ram", 15000.00)
emp3: Emp = Emp(3,Ram,15000.0)
scala> val empList = List(emp1,emp2,emp3)
empList: List[Emp] = List(Emp(1,james,13000.0), Emp(2,michael,12000.0), Emp(3,Ram,15000.0))
// sort in descending order on the basis of salary.
scala> empList.sortWith(_.salary > _.salary)
Other options are :
How to sort an RDD in Scala Spark?
In order to use this option you need to convert the data frame to PairedRDD and then do a sortbykey using the answer given there.