Creating a new accumulator on each update step of

2019-07-25 11:23发布

问题:

I am implementing an UDAF according to UDAF example. the update phase there looks like this:

    public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
        String inputKey = input.getString(0);
        Map<String, Long> inputValues = input.<String, Long>getJavaMap(1);
        Map<String, Map<String, Long>> newData = new HashMap<>();

        if (!buffer.isNullAt(0)) {
            Map<String, Map<String, Long>> currData = buffer.<String, Map<String, Long>>getJavaMap(0);
            newData.putAll(currData);
        }
        newData.put(inputKey, inputValues);
        buffer.update(0, newData);
    }
}

You can see that on every step a new HashMap is created (newData) and the data from the previous buffer is copied into it. It looks like an awful waste, having to create new Maps and copying all the elements. So I tried (in my case I have a map with a slightly different types):

bufferJavaMap = buffer.<String, Integer>getJavaMap(0);
bufferJavaMap.put("aaaa", 1);
buffer.update(0, bufferJavaMap);

I receive the following error:

java.lang.UnsupportedOperationException
   at java.util.AbstractMap.put(AbstractMap.java:209)
   at dns.MergeMapUDAF.update(MergeMapUDAF.java:84)

Isn't it possible to update the existing Map? what is the best method update this Map?

回答1:

Isn't it possible to update the existing Map?

It is not possible, but the problem is more complex than the one identified in your. Spark makes a full copy of the structure on both get and update so even removing explicit copy wouldn't resolve the problem.

If performance is required, you should avoid using UserDefinedAggregateFunction with non-atomic types.