I'm trying to access a key from a map using Flink's SQL API. It fails with the error Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY Please advise how i can fix it. Here is my event class
public class EventHolder {
private Map<String,String> event;
public Map<String, String> getEvent() {
return event;
}
public void setEvent(Map<String, String> event) {
this.event = event;
}
}
Here is the main class which submits the flink job
public class MapTableSource {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<EventHolder> mapEventStream = env.fromCollection(getMaps());
// register a table and use SQL
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerDataStream("mapEvent", mapEventStream);
//tableEnv.registerFunction("orderSizeType", new OrderSizeType());
Table alerts = tableEnv.sql(
"select event['key'] from mapEvent ");
DataStream<String> alertStream = tableEnv.toAppendStream(alerts, String.class);
alertStream.filter(new FilterFunction<String>() {
private static final long serialVersionUID = -2438621539037257735L;
@Override
public boolean filter(String value) throws Exception {
System.out.println("Key value is:"+value);
return value!=null;
}
});
env.execute("map-tablsource-job");
}
private static List<EventHolder> getMaps(){
List<EventHolder> list = new ArrayList<>();
for(int i=0;i<5;i++){
EventHolder holder = new EventHolder();
Map<String,String> map = new HashMap<>();
map.put("key", "value");
holder.setEvent(map);
list.add(holder);
}
return list;
}
}
When I run it I'm getting the exception
Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:529)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.<init>(operators.scala:529)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.c.p.flink.MapTableSource.main(MapTableSource.java:25)
I'm using flink 1.3.1
I solved a similar issue with the following:
I think the problem lies in
fromCollection
. Flink is not able to extract the needed type information because of Java limitations (i.e. type erasure). Therefore you map is treated as black box with SQL ANY type. You can verify the types of your table by usingtableEnv.scan("mapEvent").printSchema()
. You can specify the type information infromCollection
withTypes.MAP(Types.STRING, Types.STRING)
.