Let's say I have 2 tables and both of them have a column that contains timestamp
for various events. The timestamp values in both the tables are different as they are for different events.
I want to join the two tables such that every record in table1 is joined with first lower timestamp on table2.
For e.g.
Table1 Table2
142.13 141.16
157.34 145.45
168.45 155.85
170.23 166.76
168.44
Joined Table should be:
142.13,141.16
157.34,155.85
168.45,166.76
170.23,168.44
I am using Apache Spark SQL.
I am a noob in SQL and this doesn't look like job for a noob :). Thanks.
Ditto has shown the straight-forward way to solve this. If Apache Spark really has problems with this very basic query, then join first (which can lead to a big intermediate result) and aggregate then:
select t1.v, max(t2.v)
from table1 t1
join table2 t2 on t2.v <= t1.v
group by t1.v
order by t1.v;
Try this:
with t1 as (
select 142.13 v from dual union all
select 157.34 v from dual union all
select 168.45 v from dual union all
select 170.23 v from dual
),
t2 as (
select 141.16 v from dual union all
select 145.45 v from dual union all
select 155.85 v from dual union all
select 166.76 v from dual union all
select 168.44 v from dual
)
select v, ( select max(v) from t2 where t2.v <= t1.v )
from t1;
V (SELECTMAX(V)FROMT2WHERET2.V<=T1.V)
---------- -----------------------------------
142.13 141.16
157.34 155.85
168.45 168.44
170.23 168.44
4 rows selected.
the WITH clause is just me faking the data ...
the simplified query is just:
select t1.v, ( select max(t2.v) from table2 t2 where t2.v <= t1.v ) from table1 t1
[edit]
admittedly, I'm not familiar with Spark .. but this is simple enough SQL .. I'm assuming it works :)
[/edit]
If you are using apache spark sql then you can join these two tables as dataframes with a adding a column using monotonically_increasing_id()
val t1 = spark.sparkContext.parallelize(Seq(142.13, 157.34, 168.45, 170.23)).toDF("c1")
val t2 = spark.sparkContext.parallelize(Seq(141.16,145.45,155.85,166.76,168.44)).toDF("c2")
val t11 = t1.withColumn("id", monotonically_increasing_id())
val t22 = t2.withColumn("id", monotonically_increasing_id())
val res = t11.join(t22, t11("id") + 1 === t22("id") ).drop("id")
Output:
+------+------+
| c1| c2|
+------+------+
|142.13|145.45|
|168.45|166.76|
|157.34|155.85|
|170.23|168.44|
+------+------+
Hope this helps