I have a data dump of Work orders as below. I need to identify the orders who are all having the status of both 'In Progress' and 'Finished'.
Also, need to display display only in case of 'In progress' status with 'Finished/Not Valid' status. The output I have mentioned below. What is the best approach I can follow for the same in Spark. The input and output are attached here.
Input
Work_ Req_Id,Assigned to,Date,Status
R1,John,3/4/15,In Progress
R1,George,3/5/15,In Progress
R2,Peter,3/6/15,In Progress
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R4,Patrick,3/9/15,Finished
R4,Patrick,3/10/15,Not Valid
R5,Peter,3/11/15,Finished
R6,,3/12/15,Not Valid
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R8,John,3/15/15,Finished
R8,John,3/16/15,Failed
R9,Alaxender,3/17/15,Finished
R9,John,3/18/15,Removed
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold
Output
Work_ Req_Id,Assigned to,Date,Status
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold
You can use groupBy
with collect_list
to collect the status list per Work_Req_Id
along with a UDF
to filter for the wanted statuses. The grouped dataframe is then joined with the original dataframe.
Window functions aren't being proposed here given that Spark 1.6 doesn't seem to support collect_list/collect_set
in window operations.
val df = Seq(
("R1", "John", "3/4/15", "In Progress"),
("R1", "George", "3/5/15", "In Progress"),
("R2", "Peter", "3/6/15", "In Progress"),
("R3", "Alaxender", "3/7/15", "Finished"),
("R3", "Alaxender", "3/8/15", "In Progress"),
("R4", "Patrick", "3/9/15", "Finished"),
("R4", "Patrick", "3/10/15", "Not Valid"),
("R5", "Peter", "3/11/15", "Finished"),
("R6", "", "3/12/15", "Not Valid"),
("R7", "George", "3/13/15", "Not Valid"),
("R7", "George", "3/14/15", "In Progress"),
("R8", "John", "3/15/15", "Finished"),
("R8", "John", "3/16/15", "Failed"),
("R9", "Alaxender", "3/17/15", "Finished"),
("R9", "John", "3/18/15", "Removed"),
("R10", "Patrick", "3/19/15", "In Progress"),
("R10", "Patrick", "3/20/15", "Finished"),
("R10", "Patrick", "3/21/15", "Hold")
).toDF("Work_Req_Id", "Assigned_To", "Date", "Status")
def wanted = udf(
(statuses: Seq[String]) => statuses.contains("In Progress") &&
(statuses.contains("Finished") || statuses.contains("Not Valid"))
)
val df2 = df.groupBy($"Work_Req_Id").agg(collect_list($"Status").as("Statuses")).
where( wanted($"Statuses") ).
drop($"Statuses")
df.join(df2, Seq("Work_Req_Id")).show
// +-----------+-----------+-------+-----------+
// |Work_Req_Id|Assigned_To| Date| Status|
// +-----------+-----------+-------+-----------+
// | R3| Alaxender| 3/7/15| Finished|
// | R3| Alaxender| 3/8/15|In Progress|
// | R7| George|3/13/15| Not Valid|
// | R7| George|3/14/15|In Progress|
// | R10| Patrick|3/19/15|In Progress|
// | R10| Patrick|3/20/15| Finished|
// | R10| Patrick|3/21/15| Hold|
// +-----------+-----------+-------+-----------+