import arcpy
import multiprocessing
def simpleFunction(arg):
return arg*3
if __name__ == "__main__":
arcpy.AddMessage(" Multiprocessing test...")
jobs = []
pool = multiprocessing.Pool()
for i in range(10):
job = pool.apply_async(simpleFunction, (i,))
jobs.append(job)
for job in jobs: # collect results from the job server
print job.get()
del job, jobs
arcpy.AddMessage(" Complete")
print "Complete"
# Description # ----------- # This master script regulates the execution of the script run_union_slave.py (for each tile) # Author: Chris Snyder, WA Department of Natural Resources, chris.snyder(at)wadnr.gov import sys, string, os, shutil, time, traceback, glob try: #Defines some functions def showPyMessage(): try: print >> open(logFile, 'a'), str(time.ctime()) + " - " + str(message) print str(time.ctime()) + " - " + str(message) except: pass def launchProcess(tileNumber): global message global numberOfProcessors message = "Processing tile #" + str(tileNumber); showPyMessage() #Added this to address bug with indexTiles.shp used in the overlay tools in v9.2+ (can't run more than 1 overlay process at once) newTempDir = r"C:\temp\gptmpenvr_" + time.strftime('%Y%m%d%H%M%S') os.mkdir(newTempDir) os.environ["TEMP"] = newTempDir os.environ["TMP"] = newTempDir message = "Set TEMP and TMP variables to " + newTempDir; showPyMessage() #End bug fix... parameterList = [pythonExePath, slaveScript, root, str(tileNumber), yyyymmdd] if tileNumber <= indxPolyFeatureCount: if tileNumber == indxPolyFeatureCount or numberOfProcessors == 1: message = "Waiting for tile #" + str(indxPolyFeatureCount) + " to finish..."; showPyMessage() os.spawnv(os.P_WAIT, pythonExePath, parameterList) else: os.spawnv(os.P_NOWAIT, pythonExePath, parameterList) time.sleep(1); message = "Waiting a few seconds"; showPyMessage() #Specifies the root variable, makes the logFile variable, and does some error checking... dateTimeStamp = time.strftime('%Y%m%d%H%M%S') root = sys.argv[1] #r"E:\1junk\overlay_20060531" yyyymmdd = sys.argv[2] #this is passed from the master script since the tiles may be processed over a 2+ day period if os.path.exists(root)== False: print "Specified root directory: " + root + " does not exist... Bailing out!" sys.exit() scriptName = sys.argv[0].split("\\")[len(sys.argv[0].split("\\")) - 1][0:-3] #Gets the name of the script without the .py extension logFile = root + "\\log_files\\" + scriptName + "_" + dateTimeStamp[:8] + ".log" #Creates the logFile variable if os.path.exists(root + "\\log_files") == False: #Makes sure log_files exists os.mkdir(root + "\\log_files") if os.path.exists(logFile)== True: os.remove(logFile) message = "Deleting log file with the same name and datestamp... Recreating " + logFile; showPyMessage() workspaceDir = root + "\\index_tiles" if os.path.exists(workspaceDir)== True: message = "Overlay directory: " + workspaceDir + " already exist... Deleting and recreating " + workspaceDir; showPyMessage() shutil.rmtree(workspaceDir) else: message = "Creating index tiles directory: " + workspaceDir; showPyMessage() os.mkdir(workspaceDir) #Process: Finds python.exe pythonExePath = "" for path in sys.path: if os.path.exists(os.path.join(path, "python.exe")) == True: pythonExePath = os.path.join(path, "python.exe") if pythonExePath != "": message = "Python.exe file located at " + pythonExePath; showPyMessage() else: message = "ERROR: Python executable not found! Exiting script...."; showPyError(); sys.exit() #Determines the number of processors on the machine... numberOfProcessors = int(os.environ.get("NUMBER_OF_PROCESSORS")) maxNumberOfProcessors = 3 #The maximum number of processors you want to use if numberOfProcessors > maxNumberOfProcessors: numberOfProcessors = maxNumberOfProcessors #Determines the number of tiles in the index layer (tile numbers are assumed to be sequential) #Note: I didn't use gp.getcount so that this script wouldn't use the gp at all (and eat up more memory) indxPolyCountFile = glob.glob(root + "\\log_files\\therearethismanytiles_*.txt") if len(indxPolyCountFile) == 0: message = "Can't find index polygon count .txt file in " + root + "\\log_files" + "! Exiting script..."; showPyMessage() sys.exit() indxPolyFeatureCount = int(indxPolyCountFile[0].split("_")[-1][0:-4]) if indxPolyFeatureCount == 0: message = "Index polygon count is 0! Exiting script..."; showPyMessage() sys.exit() tileNumber = 1 numberOfProcesses = 0 slaveScript = r"\\Snarf\am\div_lm\ds\gis\ldo\current_scripts\run_union_slave_v93.py" tilesThatAreProcessingList = [] tilesThatFinishedList = [] tilesThatBombedList = [] #Process: This while loop initialy launches <numberOfProcessors> instances of the slave script while numberOfProcesses < numberOfProcessors: numberOfProcesses = numberOfProcesses + 1 tilesThatAreProcessingList.append(tileNumber) launchProcess(tileNumber) tileNumber = tileNumber + 1 #Process: This while loop checks for tiles that finished or bombed, if there are, new slave scripts are launched while len(tilesThatBombedList) + len(tilesThatFinishedList) < indxPolyFeatureCount: isDoneList = glob.glob(root + "\\log_files\\" + slaveScript.split("\\")[len(slaveScript.split("\\")) - 1][0:-3] + "_isalldone_*") #makes a list of the .txt file created by the slave script doneBombedList = glob.glob(root + "\\log_files\\" + slaveScript.split("\\")[len(slaveScript.split("\\")) - 1][0:-3] + "_bombed_*") if len(isDoneList) == 0 and len(doneBombedList) == 0: #if there are no .txt files, wait for x number of seconds time.sleep(5) else: #else if there are .txt files... if len(isDoneList) > 0: #if there are tiles that are "_isalldone_" for isDoneItem in isDoneList: #for each .txt file indicating completion... tileThatIsDone = int(isDoneItem[isDoneItem.rfind("_") + 1:isDoneItem.rfind(".")]) os.remove(root + "\\log_files\\" + slaveScript.split("\\")[len(slaveScript.split("\\")) - 1][0:-3] + "_isalldone_" + str(tileThatIsDone) + ".txt") message = "Tile #" + str(tileThatIsDone) + " is done!!!"; showPyMessage() tilesThatFinishedList.append(tileThatIsDone) tilesThatAreProcessingList.remove(tileThatIsDone) tilesThatAreProcessingList.append(tileNumber) launchProcess(tileNumber) tileNumber = tileNumber + 1 time.sleep(5) if len(doneBombedList) > 0: #if there are tiles that "_bombed_" for doneBombedItem in doneBombedList: #for each .txt file indicating failure... tileThatBombed = int(doneBombedItem[doneBombedItem.rfind("_") + 1:doneBombedItem.rfind(".")]) os.remove(root + "\\log_files\\" + slaveScript.split("\\")[len(slaveScript.split("\\")) - 1][0:-3] + "_bombed_" + str(tileThatBombed) + ".txt") message = "Tile #" + str(tileThatBombed) + " bombed!!!"; showPyMessage() tilesThatBombedList.append(tileThatBombed) tilesThatAreProcessingList.remove(tileThatBombed) tilesThatAreProcessingList.append(tileNumber) launchProcess(tileNumber) tileNumber = tileNumber + 1 time.sleep(5) message = "Tiles that are currently processing: " + str(tilesThatAreProcessingList); showPyMessage() message = "Tiles that are done: " + str(tilesThatFinishedList); showPyMessage() message = "Tiles that bombed: " + str(tilesThatBombedList); showPyMessage() time.sleep(5) if len(tilesThatBombedList) > 0: message = "ERROR - these tiles failed to process: " + str(tilesThatBombedList); showPyMessage() sys.exit() message = "ALL DONE!"; showPyMessage() print >> open(root + "\\log_files\\" + scriptName + "_isalldone.txt", 'a'), scriptName + "_isalldone.txt" except: message = "\n*** LAST GEOPROCESSOR MESSAGE (may not be source of the error)***"; showPyMessage() message = "\n*** PYTHON ERRORS *** "; showPyMessage() message = "Python Traceback Info: " + traceback.format_tb(sys.exc_info()[2])[0]; showPyMessage() message = "Python Error Info: " + str(sys.exc_type)+ ": " + str(sys.exc_value) + "\n"; showPyMessage()
As soon as I apply more than one core, the processing is not stable any more
#Import some modules import os, random, subprocess, sys, time #Process: Define a function that will launch the processes and update the job dictionary accordingly def launchProcess(jobId): global jobDict #make this a global so the function can read it inputVar1 = jobDict[jobId][2][0] #Input variables are being read from the job dictionary inputVar2 = jobDict[jobId][2][1] jobDict[jobId][4] = subprocess.Popen([jobDict[jobId][0], jobDict[jobId][1], str(inputVar1), str(inputVar2)], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) jobDict[jobId][3] = "IN_PROGRESS" #Indicate the job is 'IN_PROGRESS' #Determine how many processes to run concurrently numberOfProcessorsToUse = 3 #The number of processors you (the user) wants to use if numberOfProcessorsToUse > int(os.environ.get("NUMBER_OF_PROCESSORS")): numberOfProcessorsToUse = int(os.environ.get("NUMBER_OF_PROCESSORS")) #Populate a "job dictionary" to keep track of all the subprocess jobs and all their various inputs and outputs childScriptPath = r"C:\csny490\simple_subprocess_child.py" jobDict = {} for jobId in range (1,21): #this loop is just showing how you can populate the jobId's and their input variables - in this example theere will be 20 jobs inputVar1 = random.randrange(1,6) #this variable tells each subrocess how many seconds it will "sleep" for - between 1 and 5 seconds inputVar2 = random.randrange(2,9) #in the child proces script, if inputVar2 > 6 it will throw an exception, and the subprocess will be 'FAILED' - I simply did this to create the possibility of failed subprocesses #Format of jobDict[jobId] = [applicationExePath, childScriptPath, [list of input varibles], status {"NOT_STARTED", "IN_PROGRESS", "SUCCEEDED", "FAILED"}, subprocess.Popen object] jobDict[jobId] = [os.path.join(sys.prefix, "python.exe"), childScriptPath, [inputVar1, inputVar2], "NOT_STARTED", None] #Process: Kick off some processes, monitor the processes, and start new processes as others finish kickOffFlag = False #Indicate the process kick off has not yet occured while len([i for i in jobDict if jobDict[3] in ("SUCCEEDED","FAILED")]) < len(jobDict): if kickOffFlag == False: while len([i for i in jobDict if jobDict[3] != 'NOT_STARTED']) < numberOfProcessorsToUse: launchProcess([i for i in jobDict if jobDict[3] == 'NOT_STARTED'][0]) #Feed the appropriate jobId to the launchProcess() function kickOffFlag = True #Set the flag as True once we have done the initial kickoff for jobId in [i for i in jobDict if jobDict[3] == 'IN_PROGRESS' and jobDict[4].poll() != None]: #if an subprocess is listed as 'IN_PROGRESS' and polls as 0 (i.e. "done" but success or failure unknown) if jobDict[jobId][4].returncode == 0: #return code of 0 indicates success (no sys.exit(1) command encountered in the child process jobDict[jobId][3] = "SUCCEEDED" if jobDict[jobId][4].returncode > 0: #return code of 1 (or another integer value) indicates failure (a sys.exit(1) command was encountered in the child process) jobDict[jobId][3] = "FAILED" if len([i for i in jobDict if jobDict[3] == 'NOT_STARTED']) > 0: #if there are still jobs = 'NOT_STARTED', launch the next one in line launchProcess([i for i in jobDict if jobDict[3] == 'NOT_STARTED'][0]) print "--------------------------------------" for jobId in jobDict: print "Job ID " + str(jobId) + " = " + str(jobDict[jobId][3])
try: import sys, time var1 = int(sys.argv[1]) var2 = int(sys.argv[2]) print "Nothing to do, so sleeping for " + str(var1) + " seconds..." time.sleep(var1) if var2 > 6: test = "oh crap" + 1 #concatenating a string an int here is intended to throw an exception on purpose to demonstrate a failure print "Epic Success!" #Note: If script runs through without error, return code will be, by default, 0 - indicating success except: print "Epic Fail!" #Note: You can gather/parse all these print messages in the parent scrip using jobDict[jobId][4].communicate() sys.exit(1) #Note: If script fails, return code is 1 - indicating failure - you can set the return code to be > 0 if you want...
PYTHON_EXE = os.path.join(sys.exec_prefix, 'pythonw.exe') #if you use python.exe you get a command window multiprocessing.set_executable(PYTHON_EXE)
TypeError: can't pickle module objects
env = dict(os.environ) for x in list(env.iterkeys()): if x.startswith('GP_'): del envhp, ht, pid, tid = _subprocess.CreateProcess( _python_exe, cmd, None, None, 1, 0, env, None, None )