import arcpy
import multiprocessing
def simpleFunction(arg):
return arg*3
if __name__ == "__main__":
arcpy.AddMessage(" Multiprocessing test...")
jobs = []
pool = multiprocessing.Pool()
for i in range(10):
job = pool.apply_async(simpleFunction, (i,))
jobs.append(job)
for job in jobs: # collect results from the job server
print job.get()
del job, jobs
arcpy.AddMessage(" Complete")
print "Complete"
# Description
# -----------
# This master script regulates the execution of the script run_union_slave.py (for each tile)
# Author: Chris Snyder, WA Department of Natural Resources, chris.snyder(at)wadnr.gov
import sys, string, os, shutil, time, traceback, glob
try:
#Defines some functions
def showPyMessage():
try:
print >> open(logFile, 'a'), str(time.ctime()) + " - " + str(message)
print str(time.ctime()) + " - " + str(message)
except:
pass
def launchProcess(tileNumber):
global message
global numberOfProcessors
message = "Processing tile #" + str(tileNumber); showPyMessage()
#Added this to address bug with indexTiles.shp used in the overlay tools in v9.2+ (can't run more than 1 overlay process at once)
newTempDir = r"C:\temp\gptmpenvr_" + time.strftime('%Y%m%d%H%M%S')
os.mkdir(newTempDir)
os.environ["TEMP"] = newTempDir
os.environ["TMP"] = newTempDir
message = "Set TEMP and TMP variables to " + newTempDir; showPyMessage()
#End bug fix...
parameterList = [pythonExePath, slaveScript, root, str(tileNumber), yyyymmdd]
if tileNumber <= indxPolyFeatureCount:
if tileNumber == indxPolyFeatureCount or numberOfProcessors == 1:
message = "Waiting for tile #" + str(indxPolyFeatureCount) + " to finish..."; showPyMessage()
os.spawnv(os.P_WAIT, pythonExePath, parameterList)
else:
os.spawnv(os.P_NOWAIT, pythonExePath, parameterList)
time.sleep(1); message = "Waiting a few seconds"; showPyMessage()
#Specifies the root variable, makes the logFile variable, and does some error checking...
dateTimeStamp = time.strftime('%Y%m%d%H%M%S')
root = sys.argv[1] #r"E:\1junk\overlay_20060531"
yyyymmdd = sys.argv[2] #this is passed from the master script since the tiles may be processed over a 2+ day period
if os.path.exists(root)== False:
print "Specified root directory: " + root + " does not exist... Bailing out!"
sys.exit()
scriptName = sys.argv[0].split("\\")[len(sys.argv[0].split("\\")) - 1][0:-3] #Gets the name of the script without the .py extension
logFile = root + "\\log_files\\" + scriptName + "_" + dateTimeStamp[:8] + ".log" #Creates the logFile variable
if os.path.exists(root + "\\log_files") == False: #Makes sure log_files exists
os.mkdir(root + "\\log_files")
if os.path.exists(logFile)== True:
os.remove(logFile)
message = "Deleting log file with the same name and datestamp... Recreating " + logFile; showPyMessage()
workspaceDir = root + "\\index_tiles"
if os.path.exists(workspaceDir)== True:
message = "Overlay directory: " + workspaceDir + " already exist... Deleting and recreating " + workspaceDir; showPyMessage()
shutil.rmtree(workspaceDir)
else:
message = "Creating index tiles directory: " + workspaceDir; showPyMessage()
os.mkdir(workspaceDir)
#Process: Finds python.exe
pythonExePath = ""
for path in sys.path:
if os.path.exists(os.path.join(path, "python.exe")) == True:
pythonExePath = os.path.join(path, "python.exe")
if pythonExePath != "":
message = "Python.exe file located at " + pythonExePath; showPyMessage()
else:
message = "ERROR: Python executable not found! Exiting script...."; showPyError(); sys.exit()
#Determines the number of processors on the machine...
numberOfProcessors = int(os.environ.get("NUMBER_OF_PROCESSORS"))
maxNumberOfProcessors = 3 #The maximum number of processors you want to use
if numberOfProcessors > maxNumberOfProcessors:
numberOfProcessors = maxNumberOfProcessors
#Determines the number of tiles in the index layer (tile numbers are assumed to be sequential)
#Note: I didn't use gp.getcount so that this script wouldn't use the gp at all (and eat up more memory)
indxPolyCountFile = glob.glob(root + "\\log_files\\therearethismanytiles_*.txt")
if len(indxPolyCountFile) == 0:
message = "Can't find index polygon count .txt file in " + root + "\\log_files" + "! Exiting script..."; showPyMessage()
sys.exit()
indxPolyFeatureCount = int(indxPolyCountFile[0].split("_")[-1][0:-4])
if indxPolyFeatureCount == 0:
message = "Index polygon count is 0! Exiting script..."; showPyMessage()
sys.exit()
tileNumber = 1
numberOfProcesses = 0
slaveScript = r"\\Snarf\am\div_lm\ds\gis\ldo\current_scripts\run_union_slave_v93.py"
tilesThatAreProcessingList = []
tilesThatFinishedList = []
tilesThatBombedList = []
#Process: This while loop initialy launches <numberOfProcessors> instances of the slave script
while numberOfProcesses < numberOfProcessors:
numberOfProcesses = numberOfProcesses + 1
tilesThatAreProcessingList.append(tileNumber)
launchProcess(tileNumber)
tileNumber = tileNumber + 1
#Process: This while loop checks for tiles that finished or bombed, if there are, new slave scripts are launched
while len(tilesThatBombedList) + len(tilesThatFinishedList) < indxPolyFeatureCount:
isDoneList = glob.glob(root + "\\log_files\\" + slaveScript.split("\\")[len(slaveScript.split("\\")) - 1][0:-3] + "_isalldone_*") #makes a list of the .txt file created by the slave script
doneBombedList = glob.glob(root + "\\log_files\\" + slaveScript.split("\\")[len(slaveScript.split("\\")) - 1][0:-3] + "_bombed_*")
if len(isDoneList) == 0 and len(doneBombedList) == 0: #if there are no .txt files, wait for x number of seconds
time.sleep(5)
else: #else if there are .txt files...
if len(isDoneList) > 0: #if there are tiles that are "_isalldone_"
for isDoneItem in isDoneList: #for each .txt file indicating completion...
tileThatIsDone = int(isDoneItem[isDoneItem.rfind("_") + 1:isDoneItem.rfind(".")])
os.remove(root + "\\log_files\\" + slaveScript.split("\\")[len(slaveScript.split("\\")) - 1][0:-3] + "_isalldone_" + str(tileThatIsDone) + ".txt")
message = "Tile #" + str(tileThatIsDone) + " is done!!!"; showPyMessage()
tilesThatFinishedList.append(tileThatIsDone)
tilesThatAreProcessingList.remove(tileThatIsDone)
tilesThatAreProcessingList.append(tileNumber)
launchProcess(tileNumber)
tileNumber = tileNumber + 1
time.sleep(5)
if len(doneBombedList) > 0: #if there are tiles that "_bombed_"
for doneBombedItem in doneBombedList: #for each .txt file indicating failure...
tileThatBombed = int(doneBombedItem[doneBombedItem.rfind("_") + 1:doneBombedItem.rfind(".")])
os.remove(root + "\\log_files\\" + slaveScript.split("\\")[len(slaveScript.split("\\")) - 1][0:-3] + "_bombed_" + str(tileThatBombed) + ".txt")
message = "Tile #" + str(tileThatBombed) + " bombed!!!"; showPyMessage()
tilesThatBombedList.append(tileThatBombed)
tilesThatAreProcessingList.remove(tileThatBombed)
tilesThatAreProcessingList.append(tileNumber)
launchProcess(tileNumber)
tileNumber = tileNumber + 1
time.sleep(5)
message = "Tiles that are currently processing: " + str(tilesThatAreProcessingList); showPyMessage()
message = "Tiles that are done: " + str(tilesThatFinishedList); showPyMessage()
message = "Tiles that bombed: " + str(tilesThatBombedList); showPyMessage()
time.sleep(5)
if len(tilesThatBombedList) > 0:
message = "ERROR - these tiles failed to process: " + str(tilesThatBombedList); showPyMessage()
sys.exit()
message = "ALL DONE!"; showPyMessage()
print >> open(root + "\\log_files\\" + scriptName + "_isalldone.txt", 'a'), scriptName + "_isalldone.txt"
except:
message = "\n*** LAST GEOPROCESSOR MESSAGE (may not be source of the error)***"; showPyMessage()
message = "\n*** PYTHON ERRORS *** "; showPyMessage()
message = "Python Traceback Info: " + traceback.format_tb(sys.exc_info()[2])[0]; showPyMessage()
message = "Python Error Info: " + str(sys.exc_type)+ ": " + str(sys.exc_value) + "\n"; showPyMessage()
As soon as I apply more than one core, the processing is not stable any more
#Import some modules
import os, random, subprocess, sys, time
#Process: Define a function that will launch the processes and update the job dictionary accordingly
def launchProcess(jobId):
global jobDict #make this a global so the function can read it
inputVar1 = jobDict[jobId][2][0] #Input variables are being read from the job dictionary
inputVar2 = jobDict[jobId][2][1]
jobDict[jobId][4] = subprocess.Popen([jobDict[jobId][0], jobDict[jobId][1], str(inputVar1), str(inputVar2)], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
jobDict[jobId][3] = "IN_PROGRESS" #Indicate the job is 'IN_PROGRESS'
#Determine how many processes to run concurrently
numberOfProcessorsToUse = 3 #The number of processors you (the user) wants to use
if numberOfProcessorsToUse > int(os.environ.get("NUMBER_OF_PROCESSORS")):
numberOfProcessorsToUse = int(os.environ.get("NUMBER_OF_PROCESSORS"))
#Populate a "job dictionary" to keep track of all the subprocess jobs and all their various inputs and outputs
childScriptPath = r"C:\csny490\simple_subprocess_child.py"
jobDict = {}
for jobId in range (1,21): #this loop is just showing how you can populate the jobId's and their input variables - in this example theere will be 20 jobs
inputVar1 = random.randrange(1,6) #this variable tells each subrocess how many seconds it will "sleep" for - between 1 and 5 seconds
inputVar2 = random.randrange(2,9) #in the child proces script, if inputVar2 > 6 it will throw an exception, and the subprocess will be 'FAILED' - I simply did this to create the possibility of failed subprocesses
#Format of jobDict[jobId] = [applicationExePath, childScriptPath, [list of input varibles], status {"NOT_STARTED", "IN_PROGRESS", "SUCCEEDED", "FAILED"}, subprocess.Popen object]
jobDict[jobId] = [os.path.join(sys.prefix, "python.exe"), childScriptPath, [inputVar1, inputVar2], "NOT_STARTED", None]
#Process: Kick off some processes, monitor the processes, and start new processes as others finish
kickOffFlag = False #Indicate the process kick off has not yet occured
while len([i for i in jobDict if jobDict[3] in ("SUCCEEDED","FAILED")]) < len(jobDict):
if kickOffFlag == False:
while len([i for i in jobDict if jobDict[3] != 'NOT_STARTED']) < numberOfProcessorsToUse:
launchProcess([i for i in jobDict if jobDict[3] == 'NOT_STARTED'][0]) #Feed the appropriate jobId to the launchProcess() function
kickOffFlag = True #Set the flag as True once we have done the initial kickoff
for jobId in [i for i in jobDict if jobDict[3] == 'IN_PROGRESS' and jobDict[4].poll() != None]: #if an subprocess is listed as 'IN_PROGRESS' and polls as 0 (i.e. "done" but success or failure unknown)
if jobDict[jobId][4].returncode == 0: #return code of 0 indicates success (no sys.exit(1) command encountered in the child process
jobDict[jobId][3] = "SUCCEEDED"
if jobDict[jobId][4].returncode > 0: #return code of 1 (or another integer value) indicates failure (a sys.exit(1) command was encountered in the child process)
jobDict[jobId][3] = "FAILED"
if len([i for i in jobDict if jobDict[3] == 'NOT_STARTED']) > 0: #if there are still jobs = 'NOT_STARTED', launch the next one in line
launchProcess([i for i in jobDict if jobDict[3] == 'NOT_STARTED'][0])
print "--------------------------------------"
for jobId in jobDict:
print "Job ID " + str(jobId) + " = " + str(jobDict[jobId][3]) try: import sys, time var1 = int(sys.argv[1]) var2 = int(sys.argv[2]) print "Nothing to do, so sleeping for " + str(var1) + " seconds..." time.sleep(var1) if var2 > 6: test = "oh crap" + 1 #concatenating a string an int here is intended to throw an exception on purpose to demonstrate a failure print "Epic Success!" #Note: If script runs through without error, return code will be, by default, 0 - indicating success except: print "Epic Fail!" #Note: You can gather/parse all these print messages in the parent scrip using jobDict[jobId][4].communicate() sys.exit(1) #Note: If script fails, return code is 1 - indicating failure - you can set the return code to be > 0 if you want...
PYTHON_EXE = os.path.join(sys.exec_prefix, 'pythonw.exe') #if you use python.exe you get a command window multiprocessing.set_executable(PYTHON_EXE)
TypeError: can't pickle module objects
env = dict(os.environ)
for x in list(env.iterkeys()):
if x.startswith('GP_'):
del env
hp, ht, pid, tid = _subprocess.CreateProcess(
_python_exe, cmd, None, None, 1, 0, env, None, None
)