Multiprocessing viewsheds (easy)

875
5
07-21-2013 04:08 AM
nb
by
New Contributor
Hi there,
I have been searching the forum for this but eventhogh it seems that the solution should be quite easy to come up with, I couldn't find one.

My system: Arcgis 10.1 Sp1, Python 2.7.2, I7 4770k 3,5 ghz, 16gb ddr3 ram, ssd drive


So here is my (simple) problem: I want to take advantage of the multiprocessing module to speed up viewshed calculations

In order to get to know this function I set up a folder containung 156 shapefiles each containing one single point. No I want to calculate viewsheds for each of these points using the multiprocessing module. It starts out fine: all cores are in use up to 100% and the right viewsheds are being calculated. but then it fails telling me either that "viewshe_demt1" already exists or that a second person or application is using this folder. Below are the error messages and my current code. I discovered that the code is producing folders called "viewshe_demtX" in the "scriptx"-folder which I don't really understand, because I specified the folder for the output of the viewshed analysis to be "scriptxy"

1) ExecuteError: ERROR 010429: "Error in GRID IO: CellLyrCreateInternal: Grid c:\studium\project\scriptx\viewshe_demt1 already exists."
2) (Couldn't find the error-code): "Another person or application is accessing this directory"

import arcinfo, os, re, multiprocessing
import arcpy
from arcpy import env
from arcpy.sa import *

arcpy.CheckOutExtension("Spatial")
env.workspace = "C:/studium/project/scriptx"
arcpy.env.overwriteOutput ='True'

arcpy.CreateFolder_management("C:/studium/project", "scriptxy")

def update_shapefiles(shapefile):
    '''Worker function'''
    outViewshed = Viewshed("C:/studium/00_Master/clean/demtora_okay1", shapefile, 2, "CURVED_EARTH", 0.15)
    mystring = "C:/studium/project/scriptxy/"+shapefile[26:37]
    outViewshed.save(mystring)
        
# End update_shapefiles
def main():
    ''' Create a pool class and run the jobs.'''
    arcpy.env.workspace = "C:/studium/project/scriptx"
    print arcpy.env.workspace
    fcs = arcpy.ListFeatureClasses('turbine*')
    fc_list = [os.path.join("C:/studium/project/scriptx", fc) for fc in fcs]
    print fc_list
    pool = multiprocessing.Pool()
    pool.map(update_shapefiles, fc_list)

    # Synchronize the main process with the job processes to
    # ensure proper cleanup.
    pool.close()
    pool.join()
    # End main
 
if __name__ == '__main__':
    main()


I basically used the first code example from http://blogs.esri.com/esri/arcgis/2012/09/26/distributed-processing-with-arcgis-part-1/ and modified it (obviously poorly) to produce viewsheds. My normal Viewshed-analysis takes about 8 hours for 156 viewsheds and the CPU utilization never exceeds 12%.
I think the problem with the new code has something to do with my file-naming and is otherwise fine.
please help me get it to work properly.
any advise is welcome
Tags (2)
0 Kudos
5 Replies
StacyRendall1
Occasional Contributor III
Two possible causes:

  1. Something is happening with mystring that isn't what you expect, or that viewshed has some limitations allowed paths (I don't know, so check the docs for this).

  2. You are using pool.map(), but I think apply_async might be better for this task as the processes do not need to be synchronised (i.e. you don't care how long each process takes, each chunk can finish whenever...). I'm not sure that this will cause issues, but it can't hurt to try it with async.


You could also try it with Parallel Python rather than Multiprocessing. Some of my arcpy workflows will work only with one, while others only with the other. My blog (link below) has some posts describing the use of both Multiprocessing and Parallel Python.

To check mystring

Print statements don't work great in multiprocessing, so to test mystring you need to make the code run sequentially and print out, like this:
import arcinfo, os, re, multiprocessing
import arcpy
from arcpy import env
from arcpy.sa import *

arcpy.CheckOutExtension("Spatial")
env.workspace = "C:/studium/project/scriptx"
arcpy.env.overwriteOutput ='True'

arcpy.CreateFolder_management("C:/studium/project", "scriptxy")

def update_shapefiles(shapefile):
    '''Worker function'''
    outViewshed = Viewshed("C:/studium/00_Master/clean/demtora_okay1", shapefile, 2, "CURVED_EARTH", 0.15)
    mystring = "C:/studium/project/scriptxy/"+shapefile[26:37]
    print mystring
    outViewshed.save(mystring)
        
# End update_shapefiles
def main():
    ''' Create a pool class and run the jobs.'''
    arcpy.env.workspace = "C:/studium/project/scriptx"
    print arcpy.env.workspace
    fcs = arcpy.ListFeatureClasses('turbine*')
    fc_list = [os.path.join("C:/studium/project/scriptx", fc) for fc in fcs]
    print fc_list
    for fc in fc_list:
        update_shapefiles(fc)

    # End main
 
if __name__ == '__main__':
    main()


Comment out the actual calculation steps in the function and it will essentially print all the paths (without you having to wait for ages).

Or you can get the processes to return an output of mystring, which will work with the code below.

Asynchronous

Essentially something like this:
    jobs = [] # job queue
    pool = multiprocessing.Pool()
    for fc in fc_list:
        jobs.append(pool.apply_async(update_shapefiles, (fc, )) # send jobs to be processed - is "fc, " to make it a tuple of length 1
     for job in jobs: # collect results
        job.get()
    del jobs


You might need to return something for job.get() to work, so you could get your function to return mystring and do print job.get() instead of just job.get().
0 Kudos
nb
by
New Contributor
first of all, thank you for your quick answer. In the last days I spend a lot of time researching my problem. I found a script and edited it further. The script takes my multiple point file, "splits" it up into single point features, buffers each of them to know the extend for the elevation raster (dem) for the viewshed analysis and creates so called multiple ring buffers.
it works fine (and is very fast) exept for one misterous problem that kept me up for the last nights. I searched everywhere but couldn't find anything about it, but then I am not a programmer and my python knowlegde is very restricted.
SO here comes the mysterious problem: even though my multiple point file is valid and has no broken geometry or anything not every single point file is being bufferd. for some crazy reason the buffer analysis as well as the multiple ring buffer analysis skip certain points. If I manually delete these points the error passes to the next point. I also tried to buffer and multibuffer the whole multiple point file and it works fine. the error has to be somewhere in the script...so here is my newest version. I tried to comment it out as best as I could

# -*- coding: cp1252 -*-
# Imports...
import arcpy
from arcpy.sa import *
from arcpy import env
import multiprocessing, ntpath, os, shutil, time, math
from re import split

#overwrite is now possible
arcpy.gp.overwriteOutput = True

#create folders
arcpy.CreateFolder_management("C:/studium", "project")
arcpy.CreateFolder_management("C:/studium/project", "script0")
arcpy.CreateFolder_management("C:/studium/project", "script1")
arcpy.CreateFolder_management("C:/studium/project", "bufferRings")
arcpy.CreateFolder_management("C:/studium/project", "tmp")



def calcVS(argsCSV):
    import arcpy
    '''Worker function'''
    #takes the list (of 4 items) and splits them so every item can be accessed
    argsList = split(",", argsCSV)
    vsFC = argsList[0] #item 1 (path of the singlepoint-shape)
    elevRas = argsList[1] #item2 (path of the dem)
    outWS = argsList[2] #item3 (output path)
    NABE = float(argsList[3]) #item4 (height)

    head, tail = ntpath.split(vsFC) # split into head and tail

    #creates a path for scratch workspace 
    MbrPath =os.path.join("c:/studium/project/tmp", "MRB_"+ tail[:-4])
    #makes the directory
    os.mkdir(MbrPath)
    #sets the scratch workspace
    arcpy.env.scratchWorkspace = MbrPath

    #bufferzones will later be calculated, but I spared you the calculation
    Zone1= 500
    Zone2= 1000
    Zone3= 2000
    Zone4= 5000
    Zone5= 9000
    Zone6=15000

    #create multiple buffer rings for every pointfile with specific zones
    arcpy.MultipleRingBuffer_analysis(vsFC , "c:/studium/project/bufferRings/Rings_%s" % (tail), str(Zone1)+";"+str(Zone2)+";"+str(Zone3)+";"+str(Zone4)+";"+str(Zone5)+";"+str(Zone6) , "meters" , "", "ALL")

    #delete the scratch directory for the multiple buffer rings
    shutil.rmtree(MbrPath)

    #the buffering for the extent. Viewsheds have a maximus of 15 kilometers, so
    #the dem extent can be scaled to that extent also
    buffVS = os.path.join(outWS, "bu%s" % (tail))
    arcpy.Buffer_analysis(vsFC, buffVS, "RADIUS2")
    arcpy.env.extent = buffVS
    arcpy.Delete_management(buffVS)

    #the scratch workspace for the viewshed analysis
    scratchWS = os.path.join(outWS, tail[:-4])
    os.mkdir(scratchWS)
    arcpy.env.scratchWorkspace = scratchWS

    #check out the license
    arcpy.CheckOutExtension("Spatial")

    #create viewsheds
    outViewshed = Viewshed(elevRas, vsFC,1,"CURVED_EARTH",0.15)
    outViewshed.save(os.path.join(outWS, "%s.tif" % (tail[:-4])))

    #delete second scratch workspace
    shutil.rmtree(scratchWS)
    #delete the single point file
    arcpy.Delete_management(vsFC)



if __name__ == "__main__":
    obsShp = "C:/studium/00_Master/clean/drei.shp" #my multiple point-file 
    uidField = "FID" # unique identifier of pointfile table
    Hoehe= "NABENHOEHE" #field of heights of pointfile table
    elevationRas = "C:/studium/00_Master/clean/demtora_okay1" #digital elevation model
    outputWS = "C:/studium/project/script1" 
    arcpy.env.workspace = outputWS #sets the workspace

    start = time.clock() #remembers the time fot duration calculation at the end


    obsList=[] #create empty list for FIDs
    HoehenList =[] #create empty list for heights
    arcpy.MakeFeatureLayer_management (obsShp, "pts")
    # make temporairy layer that can be searched, then search it and save
    # FID and GESAMTHOEH from everey row in a list
    with arcpy.da.SearchCursor('pts',['FID','GESAMTHOEH']) as cursor:
        for row in cursor:
            HoehenList.append(row[1])
            obsList.append(row[0])
            
    vsList = []#create empty list for every value that shall be passed to calcVS

    for obs in obsList:
        obsFileName = "Vshed%s.shp" % (str(obs).zfill(4))
        HIES = HoehenList[obs] #save inherent height in a variable
        #make featureclasses for each point
        arcpy.Select_analysis(obsShp, obsFileName, uidField+" = "+str(obs))
        #fill list of lists for every value that shall be passed to calcVS 
        vsList.append("%s,%s,%s,%s" % (os.path.join(outputWS, obsFileName), elevationRas, outputWS, HIES))
        print vsList





    pool = multiprocessing.Pool()# starts multiprocessing

    pool.map(calcVS, vsList)
    # chops the iterable into a number of chunks and submits them to calcVS 

    pool.close()# stops sending more data
    pool.join()# waits for processes to finish and terminates them

    #calculate duration
    end = time.clock()
    duration = end - start
    hours, remainder = divmod(duration, 3600)
    minutes, seconds = divmod(remainder, 60)
    print "Completed in %dh:%dm:%fs" % (hours, minutes, seconds)

    #shows a counter before calling the next script (so the user can interfere)
    countdown = 5
    for j in range(1, countdown):
        print str(countdown - j) + " Sekunden bis nächstes Skript gestartet wird"
        time.sleep(1)

    #here the next file would be called, but I disabled it
    #execfile("c:/studium/00_Master/clean/10_2.py")


@ stacy rendall:
I tried to implement apply_async but I couldn't get it to work. All the examples I found about it on the internet suggested using the map-function. But I'm starting to think my error might have something tot do with it.
p.s.:I like your blog and have been there even before the initial post but it is definetly to complicated for me to undertsand it completely
0 Kudos
nb
by
New Contributor
for some crazy reason the buffer analysis as well as the multiple ring buffer analysis skip certain points. If I manually delete these points the error passes to the next point.


if I use multiprocessing the buffer and multibufferrings of the singlepoints 13, 55, 75, 99, 105, 110, 115 from 156 single point features are not created
if I use single core processing, so the normal stuff without moduls and pool.map than ist is just the 13 and the 99 that are skipped.
I find this so frustrating...especially because it doesn't happen if I buffer or multibuffer the whole multipoint shapefile.
0 Kudos
nb
by
New Contributor
okay I debugged it step by step and finally found out whats causing this strange behaviour:
The multiple buffer rings (as well as the normal buffer)  work fine! hey buffer all the featureclasses. When I set
arcpy.env.extent = buffVS 
it somehow deletes my buffer feature class.
I checked this phenomenon 5 times to be extra sure that this is real. I edited the script back and forth and only this code causes the error. It still can not have to do anything with my point data, because, when I don't execute the line with the
arcpy.env.extent = buffVS 

it calculates the buffer-files and they ALL have a valid extent. I  triplechecked.

I'm pretty confused by this behaviour of arcpy.env.extent = path. Why does it kill my buffer feature class and why does it work with all the other feature classes????



update: well, it seems like everytime one process (from the 8 different which are running simmultainiously) sets the environment extent to a certain extent, another process that is also running through the script uses this "global" extent and tries to create the next multiple bufferring with it. I'm not too sure about it, because stuff like this makes my brain hurt.
I tried to find a solution in which not the "global" extent (env.extent) is used, but one directly for the raster of the viewshed analysis. But all my efforts failed so far. Any ideas?
0 Kudos
StacyRendall1
Occasional Contributor III
update: well, it seems like everytime one process (from the 8 different which are running simmultainiously) sets the environment extent to a certain extent, another process that is also running through the script uses this "global" extent and tries to create the next multiple bufferring with it. I'm not too sure about it, because stuff like this makes my brain hurt.
I tried to find a solution in which not the "global" extent (env.extent) is used, but one directly for the raster of the viewshed analysis. But all my efforts failed so far. Any ideas?


OK. Good job working it out! It is really useful to be able to do that.

Env stands for environment, the things you set in there will affect all Arc tools. It is called a 'global' variable, as any tool running will check what it is when it needs to. This can be quite problematic when parallelising problems, as if 8 worker processes are running near simultaneously the value will get changed 8 times, and you can't really guarantee what the value will be when accessed later on by each process. The same is true for the scratch workspace - this should not be in the worker process code - if you need to, make your own temporary folders/GDBs. The moral of the story is that you cannot reliably set any environment variables in your worker process.

The only thing you can do is what you were trying to do, somehow define the extent so that it applies only to the files being used by the worker process. To do this you need to understand the tools really well... I have no experience with these tools, sorry...

I also notice that there is a lot of code in your worker process (~23 functional lines) this is way too much. It should be stripped back to the bare minimum, only the most CPU intensive parts and the absolutely essential other components to support them. What I want you to do is run your worker process sequentially with timers around every operation. This will be a pain, but is essential to work out what is the actual thing you need to parallelise. You can do it like this:
tS = time.clock()
### arcpy.SomeOperation()
arcpy.AddMessage('SomeOperation took: %.2f seconds' % (time.clock() - tS))


When you read my blog, did you work through all three posts on multiprocessing? I tried to make it describe a simple and clear process for getting a general problem to parallelise, but it could still need some work to increase clarity. I suggest you do each one in order and copy any code across by manually typing it - this really helps lock it in. If you are still getting stuck with the posts, please let me know what is confusing and I will try to improve them!

You also mention that apply_async doesn't work for you - can you provide any more information?
0 Kudos