Issues with multiprocessing and spatial analyst

4080
14
02-27-2014 12:03 AM
JamesRamm
New Contributor II
I have read every blog post and thread I can find on multiprocessing with arcpy and none of the fixes in them have fully addressed my problem.

I'm trying to do a relatively simple watershed calculation using multiprocessing.
The 'worker' function looks like this:
def multi_watershed(pnts, branchID, flowdir, flowacc, scratchWks):

    direc = tempfile.mkdtemp(dir = scratchWks) # If called in a pll process, needs to write to seperate directories
    arcpy.env.scratchWorkspace = direc    

    polylist = []
    for i, p in enumerate(pnts):
        pnt = arcpy.PointGeometry(arcpy.Point(p.x, p.y, ID=i))  #Convert the shapely point to an arcpy point
        pourpt = sa.SnapPourPoint(pnt, flowacc, 1000) 
        ws = sa.Watershed(flowdir, pourpt)  
        out = os.path.join(direc, "pol_%i"%i) #Generate a filename for the output polygon
        arcpy.RasterToPolygon_conversion(ws, out)
        polylist.append(out) #Append the output file to the list to be returned
    res = (branchID, polylist)
    return res


Given a list of points, it snaps the point to high flow accumulation, calculates the watershed and converts it to a polygon.
Using one process, this works fine.

I have a dictionary where each value is a list of points and I am trying to do the multiprocessing over this dictionary. The multiprocessing function looks like this:

def watershed_pll(data, flowdir, flowacc, tempfolder, proc=4):
    """ Calculate the watershed for each station point using parallel processing """
    pool = Pool(processes = proc)
    jobs = []
    for key, val in data.iteritems():
        jobs.append(pool.apply_async(multi_watershed, (val, key, flowdir, flowacc, temp)))    
    pool.close()
    pool.join()                   
    return jobs


It is as simple as can be and just returns the list of 'Apply_Result' objects. I then run this function from a script.
When using multiprocessing, sometimes it works, but more often than not I get one of these errors:

ERROR 010088: Invalid input geodataset (Layer, Tin, etc.).]

or

Unable to remove directory.  Possible causes:
1- Not owner of the directory
2- Another person or application is accessing this directory

or even

FATAL ERROR(INFADI)
MISSING FILE OR DIRECTORY

There seems to be no pattern as to if/when these errors will occur and which one it will be...

Any ideas?
Tags (2)
0 Kudos
14 Replies
DuncanHornby
MVP Notable Contributor
Just a comment, how are you running this script? I have never been able to get multiprocessing to work in pyscripter (my IDE of choice). A collegue of mine did use multiprocessing successfully but ran it in IDLE.
0 Kudos
JamesRamm
New Contributor II
Just a comment, how are you running this script? I have never been able to get multiprocessing to work in pyscripter (my IDE of choice). A collegue of mine did use multiprocessing successfully but ran it in IDLE.



Through spyder, but really the python console is a seperate process, so I doubt spyder is impacting anything.

I run other operations using arcpy and multiprocessing with no problem - the difference there is that they are only calling one tool; here there are 3 or 4.

My feeling is that arc is attempting to delete/move data inbetween operations that I haven't told it to....although I could be wrong.
0 Kudos
JamesRamm
New Contributor II
I wonder if this be one of two potential problems:

1. Environments/folders getting mixed up. The input data is passed as complete filepaths to some raster datasets which are outside of the scratchWorkspace (which is set locally for each process) where intermediate/output data is created. However, I have noticed that Arc may make folders (typically and 'info' folder) in the directories of the input data. Why is that? Can it be prevented, or can I prevent Arc from then trying to delete it?

2. Are there any potential problems with accessing the input raster data sets at the same time? I.e each process will be attempting to open and read from the flow direction and flow accumulation rasters which are passed into the function.
0 Kudos
DuncanHornby
MVP Notable Contributor
The info directory is an important part of an ESRI raster, you will not be able to prevent its generation. With regards to #2 just had an idea, if several processes connecting to the same data is the problem why not duplicate that data but with different names? It looks like you are attempting to use 4 cores so have 4 flow direction grids? Just an idea?
0 Kudos
JamesRamm
New Contributor II
Yes that is worth trying; it will at least confirm whether that is the source of the error.
0 Kudos
JamesRamm
New Contributor II
I believe I have solved the issue. I have just done 6 tests with no error...

However, I have not solved the problem of why the issue occurred.

My original 'worker' function called a number of arcpy commands in sequence:

- first it converted a point to an arcpy point using Point and PointGeometry
- It called the SnapPourPoint tool which output a temporary raster
- It called the Watershed tool, which output another temporary raster
- It called the RasterToPolygon_conversion tool to create a Polygon object

I edited the original code so that the only tool used with parallel processing is the Watershed tool (which is most time consuming).

The point conversion and snap pour point tool is called in its own loop separately and the results stored to disk.

The watershed calculation is then performed using parallel processing and the resulting rasters stored to disk.

The conversion is then performed in a loop on these results. Finally, all those intermediate files

The parallel code now looks like this:

def watershed_pll(data, flowdir, flowacc, tempfolder proc=4):
    """ Calculate the watershed for each station point using parallel processing """
    
    # Preprocessing of data points for watershed calculation
    pnts = []    
    for key, val in data.iteritems():
        pnts.append(prepare_point(val, flowacc, key, tempfolder))
    pp = dict(pnts)
    
    # Do the watershed calc in parallel
    pool = Pool(processes = proc)
    jobs = []        
    for key, val in pp.iteritems():
        jobs.append(pool.apply_async(pll_watershed, (val, key, flowdir, flowacc, temp)))    
    pool.close()
    pool.join()  

    # Converte watershed rasters to polygons
    ws_rast = dict([job.get() for job in jobs])
    res = []
    for key, val in ws_rast.items():
        res.append(af.ws_poly(val, key, mod.temp))
    pols = dict(res)
    return pols


It looks much bigger - there are now 3 loops instead of one. There is also a lot of dictionary formatting etc to preserve the results format/order for the next loop. This is all a bit more expensive, but happily, the watershed tool is by far the most time consuming and is parallelised. So there is still a significant speed up from single processing.

Hopefully, the other 2 loops can also be parallelised independently to give another speed up... I'll look into that next.

My only worry is that the intermediate files cannot be deleted each iteration as they need to be used in the next loop..this could potentially be a problem with big data..

I am also unsure what caused the initial problem in the first place, which would be good to know.
CodyScott
Occasional Contributor
In my experience with multiprocessing and pyscripter i have learned a few things about this.

Similar to you i had a nightmare trying to get scripts to complete correctly, they would often power through 200 rasters or so then simply stop for no reason...

The solution i had determined in my case (with rasters) was that if i attempted to write the rasters all to the root folder then it would fail. Thus, i simply had each process generate a new folder with a number on the end which would then become the destination folder.

After all my rasters had been processed you can then step through the folders and collect them all into a single merged file at the end.

This was the only solution that i could find that worked. The other one was creating "in_memory" versions of the base data each time a process ran. You'll need to manage the data and make sure you delete the files created otherwise you'll run out of memory.

I can toss the code up if that would be helpful..

This: http://blogs.esri.com/esri/arcgis/2012/09/26/distributed-processing-with-arcgis-part-1/
and this: http://pythongisandstuff.wordpress.com/2013/07/31/using-arcpy-with-multiprocessing-%E2%80%93-part-3/

were also dead useful
JamesRamm
New Contributor II
Code would be helpful.

I was already generating a new workspace folder for each process so that is not the problem..

Unfortunately my solution above is not the total answer. I have found that for larger data, it will still fail, but usually with an ArcGIS error code stating that it is unable to execute the tool or something (will have to wait til monday to get the exact code).

So there is still a problem somewhere...
0 Kudos
CodyScott
Occasional Contributor
The other thing that i also remember from my experience is debugging multiprocess is a pain too.

I couldn't debug when running asynchronously, i had to work the kinks of the code before then hope that the multi ran fine.
0 Kudos