Arcpy Multiprocessor issues

7329
15
06-27-2011 10:02 PM
StacyRendall1
Occasional Contributor III
Hi,

I am currently writing an arcpy script that churns through some large datasets and extracts data; to make this quicker I figured it might be worth using something like Parallel Python or the inbuilt Multiprocessing library. Ideally I require the script to be run from within Arc (well... there seems to be an inescapable Schema Lock problem using direct iterators within the script, which can be circumvented by using modelbuilder iterators that run the script once per iteration, but that is another story).

However, I can get a script by either method to run fine outside of Arc (still calling arcpy), but running from within Arc the processing job gets part way through before failing:

  • with Parallel Python it gets to the point where job_server = pp.Server() is defined, then opens another instance of ArcMap if the script is being run from ArcMap, or Catalog if running from Catalog. From Catalog the script sits waiting (progress bar doing its swishy thing, but no processing occurring) until this new instance is closed, at which point the script fails with the following:<class 'struct.error'>: unpack requires a string argument of length 8

  • Failed to execute (scriptPP). 
    When running from ArcMap, the new instance initially provides an error dialog stating â??Could not open the specified fileâ?�, after clicking OK it then proceeds as above (same error as from Catalog, only occurs after closing ArcMap). This usually also causes AppROT to fail as well. 


  • With Multiprocessing it gets as far as actually running the module, again attempts to open a new instance of whichever program it was started from. If running from ArcMap it opens an instance for each process initiated; each of which starts with an error dialog stating â??Could not find file: from multiprocessing.forking import main; main().mxdâ?�, upon clicking OK the instance opens as normal. Upon closing all new instances, the initial ArcMap becomes unresponsive (sometimes the processing dialog is still there, swishing away, sometimes it has disappeared), although there is no error message.

  • Running from Catalog, it brings up an error dialog stating that â??C:\Program Files (x86)\ArcGIS\Desktop10.0\Bin\from multiprocessing.forking import main; main() was not found.â?� Once for each instance. Then the progress dialog and ArcCatalog both become unresponsiveâ?¦  


I have created a simple python script for Multiprocessing which can reproduce the error; running fine if run directly, but causing errors as stated above when running from Arc, see below. Is anyone else familiar with these issues? Can they be confirmed on other setups? It appears that Arc is somehow trying to open parts of the modules that the processes are using, but I am not sure why. Any help would be greatly appreciated!

import arcpy 
import multiprocessing 

def simpleFunction(arg): 
return arg*3 

if __name__ == "__main__": 
arcpy.AddMessage(" Multiprocessing test...") 
jobs = [] 
pool = multiprocessing.Pool() 
for i in range(10): 
job = pool.apply_async(simpleFunction, (i,)) 
jobs.append(job) 

for job in jobs: # collect results from the job server 
print job.get() 

del job, jobs 
arcpy.AddMessage(" Complete") 
print "Complete"


ArcGIS Desktop 10.0 sp2, ArcInfo Licence running on Windows 7 with Core i7 and 8GB RAM.
Tags (2)
0 Kudos
15 Replies
JasonScheirer
Regular Contributor II
You'll need to run your script out of process (there's a checkbox in the script tool settings dialog) to get it to work. ArcMap sets up a runtime environment that may not be compatible with the way that multiprocessing bootstraps its runtime.
0 Kudos
StacyRendall1
Occasional Contributor III
Cool, thanks.

Just one note on that method; the default action when clicking on a Python file (*.py) within Windows must be set to run it (not edit it).

Also, can you add something to the documentation explaining this?

Regards
0 Kudos
ChrisSnyder
Regular Contributor III
Seeing that Arcpy isn't a native Python object, would there be any benifit (or would it even work) to send an arcpy.Whatever commands to Parallel Python, Pickle, etc.?

The only way I've ever gotten a "parallel" process to actually work with gp/arcpy objects is using os.spawnv and/or subprocess.
0 Kudos
StacyRendall1
Occasional Contributor III
Seeing that Arcpy isn't a native Python object, would there be any benifit (or would it even work) to send an arcpy.Whatever commands to Parallel Python, Pickle, etc.?

The only way I've ever gotten a "parallel" process to actually work with gp/arcpy objects is using os.spawnv and/or subprocess.


I doubt that sending an arcpy.Whatever command to Multiprocessing or Parallel Python would work... Haven't tried it though.

In this case I am able to split the data beforehand, then define a python module that does the processing on the section of data that is passed to it; it's these things that run in parallel. Let me know if you want more info - I can prepare a sample code for you - there's not much around on the internet for this kind of thing!
0 Kudos
JasonScheirer
Regular Contributor II
Correct, many objects in arcpy/arcgisscripting are not pickleable, so sending them directly across the wire in multiprocessing will not work. There are workarounds, however. If you are sending rows around, you can use tuples with the column values you need, and if you are using geometries you can use the __geo_interface__ attribute to send and the AsShape function to convert back to the geometry object in your function.
0 Kudos
ChrisSnyder
Regular Contributor III
In this case I am able to split the data beforehand, then define a python module that does the processing on the section of data that is passed to it; it's these things that run in parallel. Let me know if you want more info - I can prepare a sample code for you - there's not much around on the internet for this kind of thing!


Stacy, I'd love to see some samples. Ocassionally, I have the need to write large number-crunching stuff in Python (traversing large arrays or dictionary-type structures), and I'd love to explore ways to speed it up the processing. Youre' right, it's pretty hard to get practical examples of using some of this stuff!
0 Kudos
StacyRendall1
Occasional Contributor III
Stacy, I'd love to see some samples. Ocassionally, I have the need to write large number-crunching stuff in Python (traversing large arrays or dictionary-type structures), and I'd love to explore ways to speed it up the processing. Youre' right, it's pretty hard to get practical examples of using some of this stuff!


Hi Chris - I'm working on this in my free time; hope to get something up before next week...
0 Kudos
StacyRendall1
Occasional Contributor III
Chris,

The following is a simple example of an Arcpy script that uses Multiprocessing (or Parallel Python). I have created a blog, pythongisandstuff.wordpress.com/, which goes into greater detail of the method for developing normal scripts into scripts suitable for Multiprocessing, and has the files used in this example available for download.

There are a few requirements before you can make an Arcpy process suitable for use with Python�??s Multiprocessing library:

  1. The most calculation intensive (time consuming) part of the code must be able to be made into a Python �??module�?? and parallelised; which will be described in the following posts.

  2. Once it is made into a module, there must be no issues with data access �?? each invocation of the module should either write to a different output database (Arc locks the entrie *.gdb in use, not just the feature class being accessed) or pass data back for it to be written only at a later stage.


To explain using Multiprocessing with Python, I will set out a hypothetical example. The objective of the example is to identify the number and type, and accumulate a weight value, of all the Polygons within a certain distance of some Point features. The Polygon feature class has �??polyType' and �??polyWeight�?? attributes. This is just a simplified example that I thought up for explaining Multiprocessing �?? sorry if there is a better method than what follows for actually doing this!

Both Multiprocessing and Parallel Python use a Python list to which jobs are appended, and from which the result is then extracted. Each job is processed in turn by the first available worker (CPU/core). So, if there are six jobs in the list on a system with two CUPs, the first two jobs will start near simultaneously, then as each CPU finishes its task and becomes free it will start working on the next job in the list. Both methods can use all available CPUs on the system, or they can be limited to a predefined number (see below), or they may be limited by the method itself �?? in this example, as there are only 6 Polygon types, only 6 CPUs would be used.

See next post for code...

On my computer, the non-parallel scripts posted in Part 2 of my blog all take about 300 seconds to execute if run from the command line, and about 400 seconds if run as a tool within ArcMap or ArcCatalog. The parallelised script above takes about 170 seconds to execute if run from the command line, and 340 seconds if run as a tool within ArcMap or ArcCatalog. It seems that the first set of processes through the queue (i.e. two if two CPUs are being used, four if four CPUs are being used) are delayed for some reason, but latter processes run a lot faster.

Feel free to ask questions or point out any improvements!
0 Kudos
StacyRendall1
Occasional Contributor III
GDB files used in this code are attached to this post; see my blog pythongisandstuff.wordpress.com, which goes into greater detail about the methods for developing normal scripts into scripts suitable for Multiprocessing...

'''
Step 4 of 4: Example of using Multiprocessing/Parallel Python with Arcpy...

Can be run either:
 1. from the command line/a Python IDE (adjust paths to feature classes, as necessary)
 2. as a Script tool within ArcGIS (ensure 'Run Ptyhon script in Process' is NOT checked when importing)
 
 The Parallel Python library must be installed before it can be used.
'''

import arcpy
import multiprocessing
import time
try:
 import pp
 forceMP = False
except ImportError:
 forceMP = True

def performCalculation(points_fC, polygons_fC, searchDist, typeList, calcPlatform_input=None):
 '''performCalculation(pointsFeatureClass, polygonsFeatureClass, searchDistance, typeList, calcPlatform)
 
 All inputs are specific.
 
 calcPlatform is either 'mp', to use the inbuilt Multiprocessing library, or 'pp', to use Parallel Python (if it exists, otherwise defaults to mp)
 
 '''
  
 # ---------------------------------------------------------------------------
 ## Pre-calculation checks
 # ---------------------------------------------------------------------------
 
 # Check calculation platform is valid, or resort to default...
 defaultCalcTpye = 'mp'
 calcTypeExplainDict = {'pp':'Parallel Python', 'mp':'Multiprocessing'}
 if forceMP: # unable to import Parallel Python library (it has to be installed seperately)
  arcpy.AddMessage("   WARNING: Cannot load Parallel Python library, forcing Multiprocessing library...")
  calcPlatform = 'mp'
 elif (calcPlatform_input not in ['mp', 'pp']) or (calcPlatform_input == None): # Invalid/no input, resort to default
  arcpy.AddMessage("   WARNING: Input calculation platform '%s' invalid; should be either 'mp' or 'pp'. Defaulting to %s..." % (calcPlatform_input, calcTypeExplainDict[defaultCalcTpye]))
  calcPlatform = defaultCalcTpye
 else:
  calcPlatform = calcPlatform_input


 # ---------------------------------------------------------------------------
 ## Data extraction (parallel execution)
 # ---------------------------------------------------------------------------
 
 searchDist = int(searchDist) # convert search distance to integer...

 # check all datasets are OK; if not, provide some useful output and terminate
 valid_points = arcpy.Exists(points_fC)
 arcpy.AddMessage("Points Feature Class: "+points_fC)
 valid_polygons = arcpy.Exists(polygons_fC)
 arcpy.AddMessage("Polygons Feature Class: "+polygons_fC)
 dataCheck  = valid_points & valid_polygons
 
 if not dataCheck:
  if not valid_points:
   arcpy.AddError("Points database or feature class, %s,  is invalid..." % (points_fC))
  if not valid_polygons:
   arcpy.AddError("Polygons database or feature class, %s,  is invalid..." % (polygons_fC))
   
 else: # Inputs are OK, start calculation...
 
  for type in typeList: # add fields to Points file
   arcpy.AddField_management(points_fC, "polygon_type%s_Sum" % type, "DOUBLE")
   arcpy.CalculateField_management(points_fC, "polygon_type%s_Sum" % type, 0, "PYTHON")
   arcpy.AddField_management(points_fC, "polygon_type%s_Count" % type, "DOUBLE")
   arcpy.CalculateField_management(points_fC, "polygon_type%s_Count" % type, 0, "PYTHON")
   arcpy.AddMessage("    Added polygon_type%s_Sum and polygon_type%s_Count fields to Points." % (type,type))

  pointsDataDict = {} # dictionary: pointsDataDict[pointID][Type]=[sum of Polygon type weightings, count of Ploygons of type]
  jobs = [] # jobs are added to the list, then processed in parallel by the available workers (CPUs)

  if calcPlatform == 'mp':
   arcpy.AddMessage("    Utilising Python Multiprocessing library:")
   pool = multiprocessing.Pool() # initate the processing pool - use all available resources
#   pool = multiprocessing.Pool(2) # Example: limit the processing pool to 2 CPUs...
   for type in typeList: # For this specific example
    arcpy.AddMessage("      Passing %s to processing pool..." % type)
    jobs.append(pool.apply_async(findPolygons, (points_fC, polygons_fC, type, searchDist))) # send jobs to be processed

   for job in jobs: # collect results from the job server (waits until all the processing is complete)
    if len(pointsDataDict.keys()) == 0: # first output becomes the new dictionary
     pointsDataDict.update(job.get())
    else:
     for key in job.get(): # later outputs should be added per key...
      pointsDataDict[key].update(job.get()[key])
   del jobs

  elif calcPlatform == 'pp':
   ppservers=()
   job_server = pp.Server(ppservers=ppservers) # initate the job server - use all available resources
#   job_server = pp.Server(2, ppservers=ppservers) # Example: limit the processing pool to 2 CPUs...
   arcpy.AddMessage("    Utilising Parallel Python library:")
   for type in typeList: # For this specific example, it would only initate three processes anyway...
    arcpy.AddMessage("      Passing %s to processing pool..." % type)       
    jobs.append(job_server.submit(findPolygons, (points_fC, polygons_fC, type, searchDist), (), ("arcpy",))) # send jobs to be processed

   for job in jobs: # collect results from the job server (waits until all the processing is complete)
    if len(pointsDataDict.keys()) == 0: # first output becomes the new dictioanry
     pointsDataDict.update(job())
    else:
     for key in job():
      pointsDataDict[key].update(job()[key]) # later outputs should be added per key...
   del jobs
   
  # ---------------------------------------------------------------------------
  ## Writing data back to points file
  # ---------------------------------------------------------------------------

  arcpy.AddMessage("    Values extracted; writing results to Points...")
  pointsRowList = arcpy.UpdateCursor(points_fC)
  for pointsRow in pointsRowList: # write the values for every point
   pointID = pointsRow.getValue("PointID")
   for type in typeList:
    pointsRow.setValue("polygon_type%s_Sum" % type, pointsRow.getValue("polygon_type%s_Sum" % type) + pointsDataDict[pointID][type][0])
    pointsRow.setValue("polygon_type%s_Count" % type, pointsRow.getValue("polygon_type%s_Count" % type) + pointsDataDict[pointID][type][1])
    
   pointsRowList.updateRow(pointsRow)
   del pointsRow
   
  del pointsRowList
  # just make sure any locks are cleared...
  del calcPlatform, calcPlatform_input, calcTypeExplainDict, dataCheck, defaultCalcTpye, job, key, pointID, pointsDataDict, points_fC, polygons_fC, type, valid_points, valid_polygons, searchDist, typeList


def findPolygons(points_FC, polygons_FC, Type, search_dist):
 funcTempDict = {}
 arcpy.MakeFeatureLayer_management(polygons_FC, '%s_%s' % (polygons_FC, Type))
 arcpy.MakeFeatureLayer_management(points_FC, '%s_%s' % (points_FC, Type))
 pointsRowList = arcpy.SearchCursor('%s_%s' % (points_FC, Type))
 for pointRow in pointsRowList: # for every origin
  pointID = pointRow.getValue("PointID")

  try:
   funcTempDict[pointID][Type] = [0,0]
  except KeyError: # first time within row
   funcTempDict[pointID] = {}
   funcTempDict[pointID][Type] = [0,0]

  arcpy.SelectLayerByAttribute_management('%s_%s' % (points_FC, Type), 'NEW_SELECTION', '"PointID" = \'%s\'' % pointID)
  arcpy.SelectLayerByLocation_management('%s_%s' % (polygons_FC, Type), 'INTERSECT', '%s_%s' % (points_FC, Type), search_dist, 'NEW_SELECTION')
  arcpy.SelectLayerByAttribute_management('%s_%s' % (polygons_FC, Type), 'SUBSET_SELECTION', '"polyType" = %s' % Type)
  polygonsRowList = arcpy.SearchCursor('%s_%s' % (polygons_FC, Type))
  for polygonsRow in polygonsRowList:
   funcTempDict[pointID][Type][0] += polygonsRow.getValue("polyWeighting")
   funcTempDict[pointID][Type][1] += 1
 
 return funcTempDict


if __name__ == '__main__':
 # Read the parameter values:
 #  1: points feature class (string to location, e.g. c:\GIS\Data\points.gdb\points01)
 #  2: polygons feature class (string to location)
 #  3: search distance for polygons, integer, e.g 500
 #  4: calculation type ('mp' to use Multiprocessing library, 'pp' to use Parallel Python library (if available, otherwise defaults to mp))
 pointsFC = arcpy.GetParameterAsText(0) # required
 polygonsFC = arcpy.GetParameterAsText(1) # required
 search_Distance = arcpy.GetParameterAsText(2) # required
 calcType = arcpy.GetParameterAsText(3) # optional (will default to MP)

 t_start = time.clock()
 
 type_list = [1,2,3,4,5,6] # polygon types to search for...
 
 # run calculation
 if '' in [pointsFC, polygonsFC, search_Distance]:# if not running from Arc, the input parameters all come out as ''
  pointsFC = "c:\\Work\\GIS\\Data\\Points.gdb\\points01"
  polygonsFC = "c:\\Work\\GIS\\Data\\Polygons.gdb\\polygons01"
  search_Distance = 1000
  performCalculation(pointsFC, polygonsFC, search_Distance, type_list)
 else: 
  performCalculation(pointsFC, polygonsFC, search_Distance, type_list, calcType)
 
 arcpy.AddMessage("      ... complete in %s seconds." % (time.clock() - t_start))
0 Kudos