Does Flink SQL support Java Map types?

2019-08-03 06:20发布

问题:

I'm trying to access a key from a map using Flink's SQL API. It fails with the error Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY Please advise how i can fix it. Here is my event class

 public class EventHolder {

    private Map<String,String> event;

    public Map<String, String> getEvent() {
        return event;
    }

    public void setEvent(Map<String, String> event) {
        this.event = event;
    }

}

Here is the main class which submits the flink job

public class MapTableSource {
public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<EventHolder> mapEventStream = env.fromCollection(getMaps());

    // register a table and use SQL
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    tableEnv.registerDataStream("mapEvent", mapEventStream); 
    //tableEnv.registerFunction("orderSizeType", new OrderSizeType());

    Table alerts = tableEnv.sql(
            "select event['key'] from mapEvent ");

    DataStream<String> alertStream = tableEnv.toAppendStream(alerts, String.class);

    alertStream.filter(new FilterFunction<String>() {
        private static final long serialVersionUID = -2438621539037257735L;

        @Override
        public boolean filter(String value) throws Exception {
            System.out.println("Key value is:"+value);
            return value!=null;
        }
    });


    env.execute("map-tablsource-job");
}


private static List<EventHolder> getMaps(){
    List<EventHolder> list = new ArrayList<>();
    for(int i=0;i<5;i++){
        EventHolder holder = new EventHolder();
        Map<String,String> map = new HashMap<>();
        map.put("key", "value");
        holder.setEvent(map);
        list.add(holder);
    }
    return list;
}
}

When I run it I'm getting the exception

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:529)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.<init>(operators.scala:529)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.c.p.flink.MapTableSource.main(MapTableSource.java:25)

I'm using flink 1.3.1

回答1:

I think the problem lies in fromCollection. Flink is not able to extract the needed type information because of Java limitations (i.e. type erasure). Therefore you map is treated as black box with SQL ANY type. You can verify the types of your table by using tableEnv.scan("mapEvent").printSchema(). You can specify the type information in fromCollection with Types.MAP(Types.STRING, Types.STRING).



回答2:

I solved a similar issue with the following:

//Should probably make MapVal more generic, but works for this example
public class MapVal extends ScalarFunction {
    public String eval(Map<String, String> obj, String key) {
        return obj.get(key);
    }
}

public class Car {
    private String make;
    private String model;
    private int year;
    private Map<String, String> attributes;
    //getters/setters...
}

//After registering Stream and TableEnv etc

tableEnv.registerFunction("mapval", new MapVal());

Table cars = tableEnv
                .scan("Cars")
                .select("make, model, year, attributes.mapval('name')");