Swift metal parallel sum calculation of array on i

2019-04-15 05:52发布

问题:

Based on @Kametrixom answer, I have made some test application for parallel calculation of sum in an array.

My test application looks like this:

import UIKit
import Metal

class ViewController: UIViewController {

// Data type, has to be the same as in the shader
typealias DataType = CInt

override func viewDidLoad() {
    super.viewDidLoad()

    let data = (0..<10000000).map{ _ in DataType(200) } // Our data, randomly generated


    var start, end : UInt64


    var result:DataType = 0
    start = mach_absolute_time()
    data.withUnsafeBufferPointer { buffer in
        for elem in buffer {
            result += elem
        }
    }
    end = mach_absolute_time()

    print("CPU result: \(result), time: \(Double(end - start) / Double(NSEC_PER_SEC))")

    result = 0


    start = mach_absolute_time()
    result = sumParallel4(data)
    end = mach_absolute_time()

    print("Metal result: \(result), time: \(Double(end - start) / Double(NSEC_PER_SEC))")


    result = 0

    start = mach_absolute_time()
    result = sumParralel(data)
    end = mach_absolute_time()

    print("Metal result: \(result), time: \(Double(end - start) / Double(NSEC_PER_SEC))")

    result = 0

    start = mach_absolute_time()
    result = sumParallel3(data)
    end = mach_absolute_time()

    print("Metal result: \(result), time: \(Double(end - start) / Double(NSEC_PER_SEC))")





}

func sumParralel(data : Array<DataType>) -> DataType {

    let count = data.count
    let elementsPerSum: Int = Int(sqrt(Double(count)))

    let device = MTLCreateSystemDefaultDevice()!
    let parsum = device.newDefaultLibrary()!.newFunctionWithName("parsum")!
    let pipeline = try! device.newComputePipelineStateWithFunction(parsum)


    var dataCount = CUnsignedInt(count)
    var elementsPerSumC = CUnsignedInt(elementsPerSum)
    let resultsCount = (count + elementsPerSum - 1) / elementsPerSum // Number of individual results = count / elementsPerSum (rounded up)


    let dataBuffer = device.newBufferWithBytes(data, length: strideof(DataType) * count, options: []) // Our data in a buffer (copied)
    let resultsBuffer = device.newBufferWithLength(strideof(DataType) * resultsCount, options: []) // A buffer for individual results (zero initialized)
    let results = UnsafeBufferPointer<DataType>(start: UnsafePointer(resultsBuffer.contents()), count: resultsCount) // Our results in convenient form to compute the actual result later

    let queue = device.newCommandQueue()
    let cmds = queue.commandBuffer()
    let encoder = cmds.computeCommandEncoder()

    encoder.setComputePipelineState(pipeline)

    encoder.setBuffer(dataBuffer, offset: 0, atIndex: 0)
    encoder.setBytes(&dataCount, length: sizeofValue(dataCount), atIndex: 1)
    encoder.setBuffer(resultsBuffer, offset: 0, atIndex: 2)
    encoder.setBytes(&elementsPerSumC, length: sizeofValue(elementsPerSumC), atIndex: 3)

    // We have to calculate the sum `resultCount` times => amount of threadgroups is `resultsCount` / `threadExecutionWidth` (rounded up) because each threadgroup will process `threadExecutionWidth` threads
    let threadgroupsPerGrid = MTLSize(width: (resultsCount + pipeline.threadExecutionWidth - 1) / pipeline.threadExecutionWidth, height: 1, depth: 1)

    // Here we set that each threadgroup should process `threadExecutionWidth` threads, the only important thing for performance is that this number is a multiple of `threadExecutionWidth` (here 1 times)
    let threadsPerThreadgroup = MTLSize(width: pipeline.threadExecutionWidth, height: 1, depth: 1)

    encoder.dispatchThreadgroups(threadgroupsPerGrid, threadsPerThreadgroup: threadsPerThreadgroup)
    encoder.endEncoding()


    var result : DataType = 0


    cmds.commit()
    cmds.waitUntilCompleted()
    for elem in results {
        result += elem
    }


    return result
}



func sumParralel1(data : Array<DataType>) -> UnsafeBufferPointer<DataType> {

    let count = data.count
    let elementsPerSum: Int = Int(sqrt(Double(count)))

    let device = MTLCreateSystemDefaultDevice()!
    let parsum = device.newDefaultLibrary()!.newFunctionWithName("parsum")!
    let pipeline = try! device.newComputePipelineStateWithFunction(parsum)


    var dataCount = CUnsignedInt(count)
    var elementsPerSumC = CUnsignedInt(elementsPerSum)
    let resultsCount = (count + elementsPerSum - 1) / elementsPerSum // Number of individual results = count / elementsPerSum (rounded up)

    let dataBuffer = device.newBufferWithBytes(data, length: strideof(DataType) * count, options: []) // Our data in a buffer (copied)
    let resultsBuffer = device.newBufferWithLength(strideof(DataType) * resultsCount, options: []) // A buffer for individual results (zero initialized)
    let results = UnsafeBufferPointer<DataType>(start: UnsafePointer(resultsBuffer.contents()), count: resultsCount) // Our results in convenient form to compute the actual result later

    let queue = device.newCommandQueue()
    let cmds = queue.commandBuffer()
    let encoder = cmds.computeCommandEncoder()

    encoder.setComputePipelineState(pipeline)

    encoder.setBuffer(dataBuffer, offset: 0, atIndex: 0)
    encoder.setBytes(&dataCount, length: sizeofValue(dataCount), atIndex: 1)
    encoder.setBuffer(resultsBuffer, offset: 0, atIndex: 2)
    encoder.setBytes(&elementsPerSumC, length: sizeofValue(elementsPerSumC), atIndex: 3)

    // We have to calculate the sum `resultCount` times => amount of threadgroups is `resultsCount` / `threadExecutionWidth` (rounded up) because each threadgroup will process `threadExecutionWidth` threads
    let threadgroupsPerGrid = MTLSize(width: (resultsCount + pipeline.threadExecutionWidth - 1) / pipeline.threadExecutionWidth, height: 1, depth: 1)

    // Here we set that each threadgroup should process `threadExecutionWidth` threads, the only important thing for performance is that this number is a multiple of `threadExecutionWidth` (here 1 times)
    let threadsPerThreadgroup = MTLSize(width: pipeline.threadExecutionWidth, height: 1, depth: 1)

    encoder.dispatchThreadgroups(threadgroupsPerGrid, threadsPerThreadgroup: threadsPerThreadgroup)
    encoder.endEncoding()


    cmds.commit()
    cmds.waitUntilCompleted()



    return results
}

func sumParallel3(data : Array<DataType>) -> DataType {

    var results = sumParralel1(data)

    repeat {
        results = sumParralel1(Array(results))
    } while results.count >= 100

    var result : DataType = 0

    for elem in results {
        result += elem
    }


    return result
}

func sumParallel4(data : Array<DataType>) -> DataType {

    let queue = NSOperationQueue()
    queue.maxConcurrentOperationCount = 4

    var a0 : DataType = 0
    var a1 : DataType = 0
    var a2 : DataType = 0
    var a3 : DataType = 0

    let op0 = NSBlockOperation( block : {

        for i in 0..<(data.count/4) {
            a0 = a0 + data[i]
        }

    })

    let op1 = NSBlockOperation( block : {
        for i in (data.count/4)..<(data.count/2) {
            a1 = a1 + data[i]
        }
    })

    let op2 = NSBlockOperation( block : {
        for i in (data.count/2)..<(3 * data.count/4) {
            a2 = a2 + data[i]
        }
    })

    let op3 = NSBlockOperation( block : {
        for i in (3 * data.count/4)..<(data.count) {
            a3 = a3 + data[i]
        }
    })



    queue.addOperation(op0)
    queue.addOperation(op1)
    queue.addOperation(op2)
    queue.addOperation(op3)

    queue.suspended = false
    queue.waitUntilAllOperationsAreFinished()

    let aaa: DataType = a0 + a1 + a2 + a3

    return aaa
 }
}

And I have a shader that looks like this:

kernel void parsum(const device DataType* data [[ buffer(0) ]],
               const device uint& dataLength [[ buffer(1) ]],
               device DataType* sums [[ buffer(2) ]],
               const device uint& elementsPerSum [[ buffer(3) ]],

               const uint tgPos [[ threadgroup_position_in_grid ]],
               const uint tPerTg [[ threads_per_threadgroup ]],
               const uint tPos [[ thread_position_in_threadgroup ]]) {

    uint resultIndex = tgPos * tPerTg + tPos; // This is the index of the individual result, this var is unique to this thread
    uint dataIndex = resultIndex * elementsPerSum; // Where the summation should begin
    uint endIndex = dataIndex + elementsPerSum < dataLength ? dataIndex + elementsPerSum : dataLength; // The index where summation should end

    for (; dataIndex < endIndex; dataIndex++)
        sums[resultIndex] += data[dataIndex];
}

On my surprise function sumParallel4 is the fastest, which I thought it shouldn't be. I noticed that when I call functions sumParralel and sumParallel3, the first function is always slower even if I change the order of function. (So if I call sumParralel first this is slower, if I call sumParallel3 this is slower.).

Why is this? Why is sumParallel3 not a lot faster than sumParallel ? Why is sumParallel4 the fastest, although it is calculated on CPU?


How can I update my GPU function with posix_memalign ? I know it should work faster because it would have shared memory between GPU and CPU, but I don't know witch array should be allocated this way (data or result) and how can I allocate data with posix_memalign if data is parameter passed in function?

回答1:

In running these tests on an iPhone 6, I saw the Metal version run between 3x slower and 2x faster than the naive CPU summation. With the modifications I describe below, it was consistently faster.

I found that a lot of the cost in running the Metal version could be attributed not merely to the allocation of the buffers, though that was significant, but also to the first-time creation of the device and compute pipeline state. These are actions you'd normally perform once at application initialization, so it's not entirely fair to include them in the timing.

It should also be noted that if you're running these tests through Xcode with the Metal validation layer and GPU frame capture enabled, that has a significant run-time cost and will skew the results in the CPU's favor.

With those caveats, here's how you might use posix_memalign to allocate memory that can be used to back a MTLBuffer. The trick is to ensure that the memory you request is in fact page-aligned (i.e. its address is a multiple of getpagesize()), which may entail rounding up the amount of memory beyond how much you actually need to store your data:

let dataCount = 1_000_000
let dataSize = dataCount * strideof(DataType)
let pageSize = Int(getpagesize())
let pageCount = (dataSize + (pageSize - 1)) / pageSize
var dataPointer: UnsafeMutablePointer<Void> = nil
posix_memalign(&dataPointer, pageSize, pageCount * pageSize)
let data = UnsafeMutableBufferPointer(start: UnsafeMutablePointer<DataType>(dataPointer),
                                      count: (pageCount * pageSize) / strideof(DataType))

for i in 0..<dataCount {
    data[i] = 200
}

This does require making data an UnsafeMutableBufferPointer<DataType>, rather than an [DataType], since Swift's Array allocates its own backing store. You'll also need to pass along the count of data items to operate on, since the count of the mutable buffer pointer has been rounded up to make the buffer page-aligned.

To actually create a MTLBuffer backed with this data, use the newBufferWithBytesNoCopy(_:length:options:deallocator:) API. It's crucial that, once again, the length you provide is a multiple of the page size; otherwise this method returns nil:

let roundedUpDataSize = strideof(DataType) * data.count
let dataBuffer = device.newBufferWithBytesNoCopy(data.baseAddress, length: roundedUpDataSize, options: [], deallocator: nil)

Here, we don't provide a deallocator, but you should free the memory when you're done using it, by passing the baseAddress of the buffer pointer to free().