I'm working on a UDAF that returns an array of elements.
The input for each update is a tuple of index and value.
What the UDAF does is to sum all the values under the same index.
Example:
For input(index,value) : (2,1), (3,1), (2,3)
should return (0,0,4,1,...,0)
The logic works fine, but I have an issue with the update method, my implementation only updates 1 cell for each row, but the last assignment in that method actually copies the entire array - which is redundant and extremely time consuming.
This assignment alone is responsible for 98% of my query execution time.
My question is, how can I reduce that time? Is it possible to assign 1 value in the buffer array without having to replace the entire buffer?
P.S.: I'm working with Spark 1.6, and I cannot upgrade it anytime soon, so please stick to solution that would work with this version.
class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{
val bucketSize = 1000
def inputSchema: StructType = StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)
def dataType: DataType = ArrayType(LongType)
def deterministic: Boolean = true
def bufferSchema: StructType = {
StructType(
StructField("buckets", ArrayType(LongType)) :: Nil
)
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = new Array[Long](bucketSize)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val index = input.getLong(0)
val value = input.getLong(1)
val arr = buffer.getAs[mutable.WrappedArray[Long]](0)
buffer(0) = arr // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for(i <- arr1.indices){
arr1.update(i, arr1(i) + arr2(i))
}
buffer1(0) = arr1
}
override def evaluate(buffer: Row): Any = {
buffer.getAs[mutable.WrappedArray[Long]](0)
}
}
TL;DR Either don't use UDAF or use primitive types in place of
ArrayType
.Without
UserDefinedFunction
Both solutions should skip expensive juggling between internal and external representation.
Using standard aggregates and
pivot
This uses standard SQL aggregations. While optimized internally it might be expensive when number of keys and size of the array grow.
Given input:
You can:
Using RDD API with
combineByKey
/aggregateByKey
.Plain old
byKey
aggregation with mutable buffer. No bells and whistles but should perform reasonably well with wide range of inputs. If you suspect input to be sparse, you may consider more efficient intermediate representation, like mutableMap
.Using
UserDefinedFunction
with primitive typesAs far as I understand the internals, performance bottleneck is
ArrayConverter.toCatalystImpl
.It look like it is called for each call
MutableAggregationBuffer.update
, and in turn allocates newGenericArrayData
for eachRow
.If we redefine
bufferSchema
as:both
update
andmerge
can be expressed as plain replacements of primitive values in the buffer. Call chain will remain pretty long, but it won't require copies / conversions and crazy allocations. Omittingnull
checks you'll need something similar toand
respectively.
Finally
evaluate
should takeRow
and convert it to outputSeq
:Please note that in this implementation a possible bottleneck is
merge
. While it shouldn't introduce any new performance problems, with M buckets, each call tomerge
is O(M).With K unique keys, and P partitions it will be called M * K times in the worst case scenario, where each key, occurs at least once on each partition. This effectively increases complicity of the
merge
component to O(M * N * K).In general there is not much you can do about it. However if you make specific assumptions about the data distribution (data is sparse, key distribution is uniform), you can shortcut things a bit, and shuffle first:
If the assumptions are satisfied it should:
Rows
.However if one or both assumptions are not satisfied, you can expect that shuffle size will increase while number of updates will stay the same. At the same time data skews can make things even worse than in
update
-shuffle
-merge
scenario.Using
Aggregator
with "strongly" typedDataset
:which could be used as shown below