Apache Flink - enable join ordering

2019-07-18 10:12发布

问题:

I have noticed that Apache Flink does not optimise the order in which the tables are joined. At the moment, it keeps the user-specified join order (basically, it takes the the query literally). I suppose that Apache Calcite can optimise the order of joins but for some reason these rules are not in use in Apache Flink.

If, for example, we have two tables 'R' and 'S'

private val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
private val fileNumber = 1
tableEnv.registerTableSource("R", getDataSourceR(fileNumber))
tableEnv.registerTableSource("S", getDataSourceS(fileNumber))
private val r = tableEnv.scan("R")
private val s = tableEnv.scan("S")

and we suppose that 'S' is empty and we want to join these tables in two ways:

val tableOne = r.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
        .join(s.as("x5, x6")).where("x4 === x5 ").select("x1, x6")


val tableTwo = s.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
          .join(r.as("x5, x6")).where("x4 === x5 ").select("x1, x6")

If we want to count the number of rows in tableOne and in tableTwo the result will be zero in both cases. The problem is that evaluating tableOne will take much longer than evaluating tableTwo.

Is there any way by which we can automatically optimise the order of how the join are executed, or even enable a possible plan cost operation by adding some statistics? How can these statistic can be added?

In the documentation at this link it is written that maybe it is necessary to change the Table environment CalciteConfig but it is not clear to me how to do it.

Please help.

回答1:

Join reordering is not enabled because Flink does not handle statistics well. Reordering joins without somewhat accurate cardinality estimates is basically gambling. Therefore, join reordering is disabled and tables are joined in the order as provided by the user. This gives a deterministic and controllable behavior.

However, you can pass optimization rules into the optimizer by passing a TableConfig with a CalciteConfig when creating the TableEnvironment, i.e., TableEnvironment.getTableEnvironment(env, yourTableConfig). In the CalciteConfig you can add optimization rules to different optimization phases. You probably want to add JoinCommunteRule and JoinAssociateRule to the logical optimization phase. You probably also have to dig into the code to check how to pass statistics into the optimizer.