I have noticed that Apache Flink does not optimise the order in which the tables are joined. At the moment, it keeps the user-specified join order (basically, it takes the the query literally). I suppose that Apache Calcite can optimise the order of joins but for some reason these rules are not in use in Apache Flink.
If, for example, we have two tables 'R' and 'S'
private val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
private val fileNumber = 1
tableEnv.registerTableSource("R", getDataSourceR(fileNumber))
tableEnv.registerTableSource("S", getDataSourceS(fileNumber))
private val r = tableEnv.scan("R")
private val s = tableEnv.scan("S")
and we suppose that 'S' is empty and we want to join these tables in two ways:
val tableOne = r.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(s.as("x5, x6")).where("x4 === x5 ").select("x1, x6")
val tableTwo = s.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(r.as("x5, x6")).where("x4 === x5 ").select("x1, x6")
If we want to count the number of rows in tableOne and in tableTwo the result will be zero in both cases. The problem is that evaluating tableOne will take much longer than evaluating tableTwo.
Is there any way by which we can automatically optimise the order of how the join are executed, or even enable a possible plan cost operation by adding some statistics? How can these statistic can be added?
In the documentation at this link it is written that maybe it is necessary to change the Table environment CalciteConfig but it is not clear to me how to do it.
Please help.