I'm doing some number crunching on a raster with arcpy
and want to use the multiprocessing
package to speed things up. Basically, I need to loop through a list of tuples, do some raster calculations using each tuple and write some outputs to files. My inputs consist of a data raster (bathymetry), a raster that defines zones, and a tuple of two floats (water surface elevation, depth). My procedure consists of a function computeplane
which takes a tuple and runs a series of raster calculations to produce five rasters (total, littoral, surface, subsurface, profundal), and then calls the function processtable
for each of those rasters to write values to a dbf using arcpy.sa.ZonalStatisticsAsTable
, add some fields using arcpy.AddField_management
, convert the dbf to a csv using arcpy.TableToTable_conversion
, and finally delete the dbf file using arcpy.Delete_management
.
Based on some other SO posts, I've wrapped my code up in main()
so that multiprocessing.Pool
will supposedly play nice. I use main()
to create the set of tuples and pool.map
to do the multiprocessing. I use the tempfile
package to pick names for the dbf file to avoid name conflicts; the csv file names are guaranteed to not conflict with the other threads.
I have tested my code with a for loop and it works fine, but when I try to use pool.map
I get
RuntimeError: ERROR 010240: Could not save raster dataset to
C:\Users\Michael\AppData\Local\ESRI\Desktop10.4\SpatialAnalyst\lesst_ras
with output format GRID.
What is happening here? This error does not show up in the non-multiprocess version of the code, and I don't write out the rasters anywhere---then again, I don't know how arcpy
deals with the intermediate rasters (I sure don't think it keeps them in memory though---they are too large). Do I need to tell arcpy
something about the way it handles the raster calculations for multiprocessing to work? I've included my python file below.
import arcpy
arcpy.CheckOutExtension("Spatial")
import arcpy.sa
import numpy
import multiprocessing
import tempfile
bathymetry_path = r'C:/GIS workspace/RRE/habitat.gdb/bathymetry_NGVD_meters'
zones_path = r'C:/GIS workspace/RRE/habitat.gdb/markerzones_meters'
table_folder = r'C:/GIS workspace/RRE/zonetables'
bathymetry = arcpy.sa.Raster(bathymetry_path)
zones = arcpy.sa.Raster(zones_path)
def processtable(raster_obj, zone_file, w, z, out_folder, out_file):
temp_name = "/" + next(tempfile._get_candidate_names()) + ".dbf"
arcpy.sa.ZonalStatisticsAsTable(zone_file, 'VALUE', raster_obj, out_folder + temp_name, "DATA", "SUM")
arcpy.AddField_management(out_folder + temp_name, "wse", 'TEXT')
arcpy.AddField_management(out_folder + temp_name, "z", 'TEXT')
arcpy.CalculateField_management(out_folder + temp_name, "wse", "'" + str(w) + "'", "PYTHON")
arcpy.CalculateField_management(out_folder + temp_name, "z", "'" + str(z) + "'", "PYTHON")
arcpy.TableToTable_conversion(out_folder + temp_name, out_folder, out_file)
arcpy.Delete_management(out_folder + temp_name)
def computeplane(wsedepth):
wse = wsedepth[0]
depth = wsedepth[1]
total = bathymetry < depth
littoral = total & ((wse - bathymetry) < 2)
surface = total & ~(littoral) & ((total + wse - depth) < (total + 2))
profundal = total & ((total + wse - depth) > (total + 5))
subsurface = total & ~(profundal | surface | littoral)
# zonal statistics table names
total_name = 'total_w' + str(wse) + '_z' + str(depth) + '.csv'
littoral_name = 'littoral_w' + str(wse) + '_z' + str(depth) + '.csv'
surface_name = 'surface_w' + str(wse) + '_z' + str(depth) + '.csv'
subsurface_name = 'subsurface_w' + str(wse) + '_z' + str(depth) + '.csv'
profundal_name = 'profundal_w' + str(wse) + '_z' + str(depth) + '.csv'
# compute zonal statistics
processtable(total, zones, wse, depth, table_folder, total_name)
processtable(littoral, zones, wse, depth, table_folder, littoral_name)
processtable(surface, zones, wse, depth, table_folder, surface_name)
processtable(profundal, zones, wse, depth, table_folder, profundal_name)
processtable(subsurface, zones, wse, depth, table_folder, subsurface_name)
def main():
watersurface = numpy.arange(-15.8, 2.7, 0.1)
# take small subset of the tuples: watersurface[33:34]
wsedepths = [(watersurface[x], watersurface[y]) for x in range(watersurface.size)[33:34] for y in range(watersurface[0:x+1].size)]
pool = multiprocessing.Pool()
pool.map(computeplane, wsedepths)
pool.close()
pool.join()
if __name__ == '__main__':
main()
UPDATE
Some more sleuthing reveals that this isn't so much a multiprocessing
problem as an issue with the way that ArcGIS does raster processing. Raster algebra results are written to files in the default workspace; in my case I had not specified a folder so arcpy
was writing the rasters to some AppData folder. ArcGIS uses basic names like Lessth, Lessth_1, etc. based on what your algebra expression is. Since I didn't specify a workspace, all the multiprocessing
threads were writing to this folder. While a single arcpy
process can keep track of names, the multiple processes all try to write the same raster names and bump into the other process' locks.
I tried creating a random workspace (file gdb) at the beginning of computeplane
and then deleting it at the end, but arcpy
usually doesn't release its lock in a timely manner and the process crashes on the delete statement. So I'm not sure how to proceed.
Well, the solution to ERROR 010240
was to use the arcpy.gp.RasterCalculator_sa
funtion instead of arcpy.sa
to write rasters with specified output names. Unfortunately, after re-implementation I ran into
FATAL ERROR (INFADI)
MISSING DIRECTORY
which is described in another stackexchange post. The recommendations in that post are to specify a different workspace in each function call, which was the first thing I tried (without success). There's also some discussion that only one raster can be written out at a time, so there's no point to mutiprocessing raster calculations anyway. I give up!
I've also looked a lot to make one script that worked.
As your data is a little different from what I'm used to, I can't test the answer, neither tell if it will work but could say somethings to open your mind to multiprocess, and propose a new script for you to test.
I've seen that there are things that can, others can't be parallelized, or gain almost nothing from multiprocesses.
Good practices to use multiprocesses and arcpy are to not use Geodatabases, try to write real '.tif' files or 'in_memory/tempfile' and delete then after calculations, inside the function to be parallelized. Don't bother yourself with space at first moment, reduce your data to test, and when it works improve it by deleting temp files, put some useful printing in the middle of it.
Back to the question, there are a few things in your script that can't be used, like passing a arcpy.Raster object, instead of that you have to pass the path to the raster and read that inside the function. The path to the raster is a string so its is Pickable, meaning that they can be passed easily to one new process and special attention must be given to that.
Other thing is to declare a variable like 'table_folder' and expect that all your 7 core nucleus get back to the original process in the first core to ask the path to it.
Multiprocesses don't easily share variables among each other and with the original code, you must make a function that runs on its own. What the main module sends to a idle cpu is to create a new python process with almost only the function declaration the passed parameters to it and the necessary imported modules, everything just ready to execute, it can't look back to ask something.
what worked for me also is to use apply_async
inside a very cleaver function (not created by me) that creates a dictionary using the processes as keys and the result from apply_async
as values, and then get it inside a try/except
to if some error occurs to one process, it won't stop the main process execution.
something like that:
def computeplane_multi(processList):
pool = Pool(processes=4, maxtasksperchild=10)
jobs= {}
for item in processList:
jobs[item[0]] = pool.apply_async(computeplane, [x for x in item])
for item,result in jobs.items():
try:
result = result.get()
except Exception as e:
print(e)
pool.close()
pool.join()
back to your code, I made a few modifications to you try(and tell me):
import arcpy
import numpy
import multiprocessing
import tempfile
bathymetry_path = r'C:/GIS workspace/RRE/habitat.gdb/bathymetry_NGVD_meters'
zones_path = r'C:/GIS workspace/RRE/habitat.gdb/markerzones_meters'
table_folder = r'C:/GIS workspace/RRE/zonetables'
def computeplane(bathymetry_path,zones_path,wsedepth):
def processtable(raster_obj, zones_path, w, z, out_folder, out_file):
zone_file = arcpy.sa.Raster(zones_path)
temp_name = "/" + next(tempfile._get_candidate_names()) + ".dbf"
arcpy.sa.ZonalStatisticsAsTable(zone_file, 'VALUE', raster_obj, out_folder + temp_name, "DATA", "SUM")
arcpy.AddField_management(out_folder + temp_name, "wse", 'TEXT')
arcpy.AddField_management(out_folder + temp_name, "z", 'TEXT')
arcpy.CalculateField_management(out_folder + temp_name, "wse", "'" + str(w) + "'", "PYTHON")
arcpy.CalculateField_management(out_folder + temp_name, "z", "'" + str(z) + "'", "PYTHON")
arcpy.TableToTable_conversion(out_folder + temp_name, out_folder, out_file)
arcpy.Delete_management(out_folder + temp_name)
bathymetry = arcpy.sa.Raster(bathymetry_path)
wse = wsedepth[0]
depth = wsedepth[1]
total = bathymetry < depth
littoral = total & ((wse - bathymetry) < 2)
surface = total & ~(littoral) & ((total + wse - depth) < (total + 2))
profundal = total & ((total + wse - depth) > (total + 5))
subsurface = total & ~(profundal | surface | littoral)
# zonal statistics table names
total_name = 'total_w' + str(wse) + '_z' + str(depth) + '.csv'
littoral_name = 'littoral_w' + str(wse) + '_z' + str(depth) + '.csv'
surface_name = 'surface_w' + str(wse) + '_z' + str(depth) + '.csv'
subsurface_name = 'subsurface_w' + str(wse) + '_z' + str(depth) + '.csv'
profundal_name = 'profundal_w' + str(wse) + '_z' + str(depth) + '.csv'
# compute zonal statistics
processtable(total, zones_path, wse, depth, table_folder, total_name)
processtable(littoral, zones_path, wse, depth, table_folder, littoral_name)
processtable(surface, zones_path, wse, depth, table_folder, surface_name)
processtable(profundal, zones_path, wse, depth, table_folder, profundal_name)
processtable(subsurface, zones_path, wse, depth, table_folder, subsurface_name)
print('point processed : {},{}'.format(wsedepth[0],wsedepth[1]))
def computeplane_multi(processList):
pool = Pool(processes=4, maxtasksperchild=10)
jobs= {}
for item in processList:
jobs[item[0]] = pool.apply_async(computeplane, [x for x in item])
for item,result in jobs.items():
try:
result = result.get()
except Exception as e:
print(e)
pool.close()
pool.join()
def main():
watersurface = numpy.arange(-15.8, 2.7, 0.1)
# take small subset of the tuples: watersurface[33:34]
wsedepths = [(watersurface[x], watersurface[y]) for x in range(watersurface.size)[33:34] for y in range(watersurface[0:x+1].size)]
processList = []
for i in wsedepths:
processList.append((bathymetry_path,zones_path,i))
computeplane_multi(processList)
if __name__ == '__main__':
main()