Multiprocessing with Spatial Analyst

3823
7
Jump to solution
05-22-2013 07:40 AM
DwightLanier
New Contributor III
Hi all,

A few questions on using multiprocessing with SA tools...

I am using ArcGIS 10.0, Windows 7, SP3...and am fine running the python script standalone outside of ArcGIS, no need to make a tool out of it.

First, when setting up the basic structure of the script, where should I make the import calls for SA? Specifically the from arcpy.sa import *? Is this something done once with other import calls like sys, os, etc., or does it need to be done within the worker module? And what about checking out the extension [arcpy.CheckOutExtension("Spatial")], should it be done in main or within the worker module?

Will any of this even work in the multiprocessing environment if there are many processes seeking to check out the SA extentsion at the same time?

If this is possible, my simple understanding of the setup would look something like this:

# Import modules... import arcpy from arcpy.sa import * import multiprocessing import os  # Define the worker function... def calcVS(vsFC, elevationRas, outputWS, count):          arcpy.CheckOutExtension("Spatial")      scratchWS = os.path.join(outputWS, "VS_%s" % (count))     os.mkdir(scratchWS)     arcpy.env.scratchWorkspace = scratchWS      outVS = arcpy.sa.Viewshed(elevationRas, vsFC)     outVS.save("vs_%s.img" % (count))      return "Completed: vs_%s" % (count)  if __name__ == "__main__":          # list of shapefiles to be used as observers,     # pretend this is populated...     fcs = []     elevRas = r"C:\Working01\DEM.img"     outWS = r"C:\Working01\Test01"      cores = multiprocessing.cpu_count()      # Start pool     pool = multiprocessing.Pool(cores - 1)     jobs = []      counter = 1     for fc in fcs:         jobs.append(pool.apply_async(calcVS, (fc, elevRas, outWS, counter)))      # Clean up pool...     pool.close()     pool.join()      # Print results     results = [job.get() for job in jobs]     print results



In this example I have taken into consideration something I saw in another post about needing to create separate scratch workspaces for arc to access on each run... [http://gis.stackexchange.com/questions/17869/multiprocessing-errors-arcgis-implementation]

So, my problem thus far, is that when I have everything set up in this way, the multiprocessing kicks in and all the cores fire up and start thinking. But when I monitor the output directory, nothing is written. I would think that even the first step of creating a new directory for the temp scratch workspace would execute, but it doesn't. I have commented out the CheckOutExtension just to see if each job will at least perform that part, but no luck.

The one error I do get doesn't help much, at least not for me but maybe it will turn on a light for someone else:

Traceback (most recent call last):
File "C:/Working01/VS_MP.py", line xx, in <module>
results = [job.get() for job in jobs]
File "C:\Python26\ArcGIS10.0\lib\multiprocessing\pool.py", line 422, in get
raise self._value
RuntimeError: <unprintable RuntimeError object>


I would suspect that it doesn't like me trying to call the get on whatever it is I return from the worker function, or the return error from the function is unable to be printed. I have done several things to try and trouble shoot this. I have commented out any return from the worker, just having it perform the viewshed calculation and save the raster. And then commenting out the "results" call. Am I running into problems here because of the way I'm using the asynchronous to add jobs? Do I need to do something with them that I'm not doing? I use this method with success on lots of other multiprocessing of vector data. I think I have tried, but will try again, using the pool.map function...

I know that's a lot to ask, but haven't seen any examples yet on using SA within python multiprocessing. Thanks in advance for any guidance...

Dwight
Tags (2)
1 Solution

Accepted Solutions
DwightLanier
New Contributor III
With the help of Greg B. at ESRI, finally got this working.  Below is a basic shell/template of what worked for me in terms of using SA tools with multiprocessing:

- first, switched to using the pool.map() instead of pool.apply_async()

- called import arcpy and from arcpy.sa import * in the first lines

- within the calcVS() function, I wanted to be able to set the processing extent based on the area that the viewshed calculation would be working for each individual observer as opposed to the default entirety of the DEM, or some standard area that was not unique to each observer.  What worked here was also adding another import arcpy call within the function which, to my understanding, creates a new set of arcpy environment variables, so that each multiprocessing function call to calcVS is not fighting to set the same global processing extent in arcpy.env.extent

- for each call to the function it needed to have it's own, unique scratch workspace.  I use the name of the observer to create a unique folder that existed only as long as the viewshed was working and then was deleted afterwards.  These unique scratch workspaces keep the multiple instances of the function from trying to write over one another in the same workspace.

- checking out the spatial analyst extension is called within the worker function

I think that's the meat of my  learning experience here...so now for the code of what the script looks like:


# Imports... import arcpy from arcpy.sa import * import multiprocessing import ntpath import os from re import split import shutil import time  def calcVS(argsCSV):      import arcpy      argsList = split(",", argsCSV)     vsFC = argsList[0]     elevRas = argsList[1]     outWS = argsList[2]      head, tail = ntpath.split(vsFC)      buffVS = os.path.join(outWS, "buff_%s" % (tail[:-4]))     arcpy.Buffer_analysis(vsFC, buffVS, "RADIUS2")     arcpy.env.extent = buffVS     arcpy.Delete_management(buffVS)      scratchWS = os.path.join(outWS, tail[:-4])     os.mkdir(scratchWS)     arcpy.env.scratchWorkspace = scratchWS      arcpy.CheckOutExtension("Spatial")      outViewshed = Viewshed(elevRas, vsFC)     outViewshed.save(os.path.join(outWS, "%s.tif" % (tail[:-4])))      shutil.rmtree(scratchWS)     arcpy.Delete_management(vsFC)   if __name__ == "__main__":      obsShp = r"C:\Data\test.shp"     uidField = "UID"     elevationRas = r"C:\Data\DEM.img"      outputWS = r"C:\Working01"     arcpy.env.workspace = outputWS      arcpy.gp.overwriteOutput = True      start = time.clock()      rows = arcpy.SearchCursor(obsShp)     obsInfoList = [row.getValue(uidField) for row in rows]     del row, rows      vsList = []      for obs in obsList:         obsFileName = "%s_obs_base.shp" % (obs)         arcpy.Select_analysis(obsShp, obsFileName, uidField+" = "+str(obs))         vsList.append("%s,%s,%s" % (os.path.join(outputWS, obsFileName), elevationRas, outputWS))      pool = multiprocessing.Pool()      pool.map(calcVS, vsList)      pool.close()     pool.join()      end = time.clock()     duration = end - start     hours, remainder = divmod(duration, 3600)     minutes, seconds = divmod(remainder, 60)     print "Completed in %dh:%dm:%fs" % (hours, minutes, seconds)


This runs like a champ, is about 2x faster than processing in series, and is the foundation for what I'm working towards.  When this is running, in the Task Manager I can clearly see all of the cores firing up and working simultaneously, and can watch the results being written as they are completed in parallel to the output folder.

Dwight

View solution in original post

0 Kudos
7 Replies
camdenlowrance
New Contributor
____Bump____
0 Kudos
RhettZufelt
MVP Frequent Contributor
Don't know if this is related, but I had issues trying to check out the sa extention in the past.  No errors, just didn't  do the work.

Saw online (don't remember if it was post or help pages) that once arcpy is imported, the license level can't be set, and I needed my Info license to utilize sa.

So, I did the following and it worked fine:

import arcinfo
import arcpy

then the rest of your imports and code.


Like I said, not sure if this would have anything to do with it, but would only take a few seconds to type it in and try.

Good luck,

R_
0 Kudos
DwightLanier
New Contributor III
With the help of Greg B. at ESRI, finally got this working.  Below is a basic shell/template of what worked for me in terms of using SA tools with multiprocessing:

- first, switched to using the pool.map() instead of pool.apply_async()

- called import arcpy and from arcpy.sa import * in the first lines

- within the calcVS() function, I wanted to be able to set the processing extent based on the area that the viewshed calculation would be working for each individual observer as opposed to the default entirety of the DEM, or some standard area that was not unique to each observer.  What worked here was also adding another import arcpy call within the function which, to my understanding, creates a new set of arcpy environment variables, so that each multiprocessing function call to calcVS is not fighting to set the same global processing extent in arcpy.env.extent

- for each call to the function it needed to have it's own, unique scratch workspace.  I use the name of the observer to create a unique folder that existed only as long as the viewshed was working and then was deleted afterwards.  These unique scratch workspaces keep the multiple instances of the function from trying to write over one another in the same workspace.

- checking out the spatial analyst extension is called within the worker function

I think that's the meat of my  learning experience here...so now for the code of what the script looks like:


# Imports... import arcpy from arcpy.sa import * import multiprocessing import ntpath import os from re import split import shutil import time  def calcVS(argsCSV):      import arcpy      argsList = split(",", argsCSV)     vsFC = argsList[0]     elevRas = argsList[1]     outWS = argsList[2]      head, tail = ntpath.split(vsFC)      buffVS = os.path.join(outWS, "buff_%s" % (tail[:-4]))     arcpy.Buffer_analysis(vsFC, buffVS, "RADIUS2")     arcpy.env.extent = buffVS     arcpy.Delete_management(buffVS)      scratchWS = os.path.join(outWS, tail[:-4])     os.mkdir(scratchWS)     arcpy.env.scratchWorkspace = scratchWS      arcpy.CheckOutExtension("Spatial")      outViewshed = Viewshed(elevRas, vsFC)     outViewshed.save(os.path.join(outWS, "%s.tif" % (tail[:-4])))      shutil.rmtree(scratchWS)     arcpy.Delete_management(vsFC)   if __name__ == "__main__":      obsShp = r"C:\Data\test.shp"     uidField = "UID"     elevationRas = r"C:\Data\DEM.img"      outputWS = r"C:\Working01"     arcpy.env.workspace = outputWS      arcpy.gp.overwriteOutput = True      start = time.clock()      rows = arcpy.SearchCursor(obsShp)     obsInfoList = [row.getValue(uidField) for row in rows]     del row, rows      vsList = []      for obs in obsList:         obsFileName = "%s_obs_base.shp" % (obs)         arcpy.Select_analysis(obsShp, obsFileName, uidField+" = "+str(obs))         vsList.append("%s,%s,%s" % (os.path.join(outputWS, obsFileName), elevationRas, outputWS))      pool = multiprocessing.Pool()      pool.map(calcVS, vsList)      pool.close()     pool.join()      end = time.clock()     duration = end - start     hours, remainder = divmod(duration, 3600)     minutes, seconds = divmod(remainder, 60)     print "Completed in %dh:%dm:%fs" % (hours, minutes, seconds)


This runs like a champ, is about 2x faster than processing in series, and is the foundation for what I'm working towards.  When this is running, in the Task Manager I can clearly see all of the cores firing up and working simultaneously, and can watch the results being written as they are completed in parallel to the output folder.

Dwight
0 Kudos
ChrisSnyder
Regular Contributor III
If it helps, I use this method to run large jobs using SA tools. Note I am using the subprocess module and not multiprocessing - but same difference. I found the secret is to reset the TEMP environment variables for every subprocess run (see the stuff relating to os.environ in the launchProcess function in the "master" script). These scripts are a bit of a hack job, but they work great for what I need it to do. Lights up 12 (out of 16) 3.8GHz cores and runs smooth for 12 hrs. SSDs help too :).
0 Kudos
MathewCoyle
Frequent Contributor
@Dwight
Nice implementation. Just a couple questions I had. Did you find a particular advantage to using arcpy.gp.overwriteOutput vs .env? Also, how do you pass in your obsList object? Or is that supposed to be the obsInfoList?
0 Kudos
DwightLanier
New Contributor III
@Dwight
Nice implementation. Just a couple questions I had. Did you find a particular advantage to using arcpy.gp.overwriteOutput vs .env? Also, how do you pass in your obsList object? Or is that supposed to be the obsInfoList?


For the gp vs env...no reason really, I think it may have just crept in there from another script.  😃

For passing in the arguments to the worker function, if I remember right, when using the apply_async() method I didn't have any trouble passing tuples of information.  You would have the function to be used and then the arguments, like:  (calcVS, (path, elevRas, outWS)), and because you were setting each job one at a time it was possible to use an iterator or loop to set up the combinations of arguments.

But when using the pool.map() I wasn't sure how to do this as generally what you're doing is giving the map call a function to use and then a list of items to perform that function on.  Thinking of it now I'll try to pass something like: (calcVS, vsList) where vsList contains [(path1, elevRas, outWS), (path2, elevRas, outWS), ...].  In the code above I was just using the vsList to catch a comma separated string for each observer from obsList that was a concatenation of the path to the shapefile, the elevation raster, and the output workspace, like "path1,elevRas,outWS" and then parsing it back out into variables inside the worker function. 

The tuples inside of a list is probably a lot better, but maybe someone can school me on this if there's another way.  I was just trying to get it to work first.
0 Kudos
DuncanHornby
MVP Notable Contributor

Awesome! This thread has just saved me from wanting to jump off the nearest bridge!

I've been developing some multiprocessing code which makes use of several arcpy environment settings and spatial analyst. My code was consistently failing/aborting and sometimes completing without error! I came across this thread saw that you were importing arcpy within the worker function. I updated my code and now everything is running as I expect. I owe you one a pint!

Thanks for this really useful thread.

0 Kudos