Multiprocessing Map Series slowing down

329
7
02-03-2021 06:34 AM
Kara_Shindle
Occasional Contributor III

Working on a script to generate a series of property record card PDFs from a map series using multiprocessing.  Learned about multiprocessing in an Advanced Python class and thought it could be used to help with this project.  Has to be run nightly on approx. 3,300 parcels, but is taking 12+ hours due to slowing drastically as it runs.  Memory also increases, but have not been able to locate memory leak (newer to Python) and Spyder locking up.  Written using PyScripter and Python 3.6.12 .  Ideally, would be able to be run once a year to generate two PDFs for each of 75,000 parcels.

Developing on a laptop with Intel(R) Core(TM) i7-6820HQ CPU @ 2.70GHz 2.70 GHz processor and  32 GB RAM / 8 cpu.

Below is the function.  It also has to look for building sketches and attach them to the resulting card if they exist.

The Parcel identifier is fed into the map series, generates 3 pages, and then looks for building sketches.

 

Looking for any guidance, function and main script attached as zip file.  

Here is the function:

 

import os, sys
import arcpy
arcpy.env.overwriteOutput = True

def worker(mxdPath, outFolder, name):
	"""
		This is the function that gets called and does the work of exporting the DDP for the UPI specified.
		Note that this function does not try to write to arcpy.AddMessage() as nothing is ever displayed.  If the cexport succeeds then it returns TRUE else FALSE.
	"""
	try:
		mxdFile = arcpy.mp.ArcGISProject(mxdPath)
		cleanName = name.replace("/", "_")
		finalPDF = os.path.join(outFolder, cleanName + ".pdf")

		l = mxdFile.listLayouts("Page_1")[0]
		if not l.mapSeries is None:
			ms = l.mapSeries
			if ms.enabled:
				ms.currentPageNumber = ms.getPageNumberFromName(name)
				file1 = os.path.join(outFolder, cleanName)
				ms.exportToPDF(file1, "CURRENT", resolution=300)
				ms.refresh()

		pdfDoc = arcpy.mp.PDFDocumentOpen(finalPDF)
		l = mxdFile.listLayouts("Page_2")[0]
		if not l.mapSeries is None:
			ms = l.mapSeries
			if ms.enabled:
				ms.currentPageNumber = ms.getPageNumberFromName(name)
				pageTwo = cleanName + "t2"
				file2 = os.path.join(outFolder, pageTwo + ".pdf")
				ms.exportToPDF(file2, "CURRENT", resolution=300)
				pdfDoc.appendPages(file2)
				os.remove(file2)
				#ms.refresh()
		try:
			resSketch = ms.pageRow.SKETCH
			if os.path.exists(resSketch):
				l = mxdFile.listLayouts("Page_3")[0]
				if not l.mapSeries is None:
					ms = l.mapSeries
					if ms.enabled:
						ms.currentPageNumber = ms.getPageNumberFromName(name)
						pageThree = cleanName + "t3"
						file3 = os.path.join(outFolder, pageThree + ".pdf")
						ms.exportToPDF(file3, "CURRENT", resolution=300)
						pdfDoc.appendPages(file3)
						os.remove(file3)
						del file3, resSketch
		except:
			pass

		try:
		#Checks for existence of Sketch PDF & appends it to new PDF
			commSketch = ms.pageRow.COMMERCIAL_SKETCH
			pdfDoc.appendPages(commSketch)
		except:
			pass
		pdfDoc.saveAndClose()
		del pdfDoc, file1, file2, mxdPath, resSketch


		del mxdFile
		return True
	except arcpy.ExecuteError:
		# Geoprocessor threw an error
		arcpy.AddError(arcpy.GetMessages(2))
		print("Execute Error:", arcpy.ExecuteError)

	except Exception as e:
		tb = sys.exc_info()[2]
		print("Failed at Line %i \n" % tb.tb_lineno)
		print("Error: {} \n".format(e))
	return False

 

 

0 Kudos
7 Replies
JeffK
by
Occasional Contributor III

Hi Kara,

... some of that code looks too familiar :).  I think this would be a good project to tackle with multi-threading.  There is a way to return messages from the process.  I'll look at my code to see how I made that happen, the basics were to return a tuple of (status, message) and then parse the return.  Granted, it wasn't processing 75k files with size, but it worked.

A couple weeks ago I was using 3.6 for a single threaded process of comparing two featureclasses and it kept bogging down.  Changed back to 2.7 and it breezed through it.  One of life's greatest mysteries.

Kara_Shindle
Occasional Contributor III

lol you might thinks so!  I was going to test your memory delete suggestion today.  To split up the 75k, what I did was I had a selection variable where it would ask for the tax district number and then select those parcels, breaking it down into chunks of 6k or less.  I didn't include it in the one I sent you to keep the code cleaner.

 

Edit: I had looked at multithreading too but didn't know enough about the two options to be able to implement.

0 Kudos
JeffK
by
Occasional Contributor III

The PRC_multipliprocessing script, I think there are some iterations that are not necessary, or can be shortened using list comprehension.

desc = arcpy.ListFields(ddpGrid)
for field in desc:
    if field.name == "Name":
	column = field.name
idList = []
with arcpy.da.SearchCursor(ddpGrid, column) as cursor:
    for row in cursor:
	idName = row[0]
	idList.append(idName)

jobs = []
for name in idList:
	jobs.append((mxdPath,outFolder,name))                  #ddpGrid, mxdFile, outFolder, name
        arcpy.AddMessage("Job list has " + str(len(jobs)) + " elements.")
#	print("Job list has " + str(len(jobs)) + " elements.")

is on the same level of indentation and it looks like getting the column name will always return the column named "Name"?  Might as well skip that and just use "Name" in the searchcursor? 

You can also use list comprehension to build your jobs array.

idList = [x[0] for x in arcpy.da.SearchCursor(ddpGrid, "Name")]

jobs = [(mxdPath,outFolder, x[0]) for x in idList] #ddpGrid, mxdFile, outFolder, name
		
arcpy.AddMessage("Job list has " + str(len(jobs)) + " elements.")

 

to get the results/ messages from the process, I used something like:

for mxd, result, status, message in res:
    if result == False:
        arcpy.AddError("{} {} with {}!".format(mxd, status, message))
    else:
        arcpy.AddMessage("{} succeeded!".format(mxd))

 

In your worker, I would suggest using one try, except, finally.  The inner tries are not really doing anything.  In your finally, del all the objects that you have open, even though the garbage collector would 'do' this, there is no telling if a failed process is staying open when there is an exception and the del is not hit.  things could be staying open.

add the globals to the script:
status = 'Failed'
msg = ''

then at the end of your processes, add	

            id = mxdPath # or other id that you can use to tell which one it is
		result = True
		status = 'Success'
		msg = ''

	except arcpy.ExecuteError:
		# Geoprocessor threw an error
		msg = arcpy.GetMessages(2)

	except Exception as e:
		tb = sys.exc_info()[2]
		msg = "Failed at Line {}  Error: {}".format(tb.tb_lineno, e)

	finally:
                del pdfDoc, file1, file2, mxdPath, resSketch, mxdFile
		return (id, result, status, message)
		

 

Last thought for now, with so many pdf's in the queue, maybe you should break them up into ranges so the cores don't get 75k processes all at once.  Maybe use the count of jobs to set up a index range to work through the jobs list 25-50 at a time.  How long does one take to export? 

These snippets might need some work to fit into your script-

Kara_Shindle
Occasional Contributor III

Thank you for all of your help - I'm working through your suggestions.  I did end up having to modify the jobs line in the worker script as it was slicing the Parcel IDs. 

So far I've implemented the list comprehension and was working through the errors to the changes on the worker script.  I haven't gotten the error handling to work properly yet and adding the "in_memory" delete hasn't shown an improvement in time / memory yet for smaller batches, but I'm hopeful tonight on a larger batch of about 3,000 cards.

0 Kudos
JeffK
by
Occasional Contributor III

Looks like the post I did for the worker  script didn't make it to the board...  I worked through the worker script and modified it to iterate over a list of pages. This way you have a dryer script and better control over when things are opened and closed.  I also combined the conditionals so it will skip over that page if they are not met without creating unnecessary variables.  I can't remember if the memory space is available for multithreading, but if it is I would suggest writing these outputs to memory and then writing the final pdf to disk.  Writing to disk 3++ times an iteration for 75k times could be expensive.

edit: If the memory space is available, you could make a class that would have properties for each page and sketch and then when the script is done for that parcel, call a method to iterate over the properties and append it into one pdf on disk.

Hope this give you some ideas.

 

import os, sys
import arcpy
from arcpy import env

status = 'Failed'
msg = ''

def openPDF():
	"""
	Helper function to open the pdfDoc when it is needed.  No point of it opening when it doesnt have to be.
	"""
	if pdfDoc = '':
		return arcpy.mp.PDFDocumentOpen(os.path.join(outFolder, cleanName + ".pdf"))
	else:
		return pdfDoc


def fileAppendandCleanup(ms, ispdfDocNeeded, outfile, name):
	"""
	Helper function to export to the pages to the pdf and perform the clean up the leftovers.
	"""
	ms.currentPageNumber = ms.getPageNumberFromName(name)
	ms.exportToPDF(outfile, "CURRENT", resolution=300)

	if ispdfDocNeeded: # this is for the second and third page that needs the pdfDoc, skips the first page
		pdfDoc = openPDF()
		pdfDoc.appendPages(outfile)
		pdfDoc.saveAndClose()
		pdfDoc = ''
		os.remove(outfile)


def worker(mxdPath, outFolder, name):
	"""
		This is the function that gets called and does the work of exporting the DDP for the UPI specified.
		Note that this function does not try to write to arcpy.AddMessage() as nothing is ever displayed.  If the cexport succeeds then it returns TRUE else FALSE.
	"""
	# I had to move this into the worker function for some reason when I did my script
	arcpy.env.overwriteOutput = True
	try:

		mxdid = mxdPath.split('/')[-1].split('.')[0] #not sure if this is the name or what, im treating it like it is the mxd path and file name
		mxdFile = arcpy.mp.ArcGISProject(mxdPath)
		cleanName = name.replace("/", "_")
		pdfDoc = ''
		
		pages = ['Page_1', 'Page_2', 'Page_3']

		for page in pages:
			# get the layout for the page
			l = mxdFile.listLayouts(page)[0]
			# if it exists, proceed with the script
			if l.mapSeries is not None:
				ms = l.mapSeries
				# proceeds if it is the first page exists and if ms is enabled
				if all([page == 'Page_1', ms.enabled]):
					##
					## Could the outfile be written to memory instead of a physical path each time?
					## 
					fileAppendandCleanup(ms, False, os.path.join(outFolder, cleanName), name)
				# proceeds if it is the second page exists and if ms is enabled
				else if all([page == 'Page_2', ms.enabled]):
					fileAppendandCleanup(ms, True, os.path.join(outFolder, cleanName + "t2.pdf"), name)
				# proceeds if it is third page exists and if ms is enabled
				else if all([page == 'Page_3', ms.enabled, os.path.exists(ms.pageRow.SKETCH)]):
					fileAppendandCleanup(ms, True, os.path.join(outFolder, cleanName + "t3.pdf"), name)

		#Checks for existence of Sketch PDF & appends it to new PDF
		if (ms.pageRow.COMMERCIAL_SKETCH is not none):
			pdfDoc = openPDF()
			pdfDoc.appendPages(ms.pageRow.COMMERCIAL_SKETCH)
			pdfDoc.saveAndClose()

		result = True
		status = 'Success'
		msg = ''

	except arcpy.ExecuteError:
		# Geoprocessor threw an error
		msg = arcpy.GetMessages(2)

	except Exception as e:
		tb = sys.exc_info()[2]
		msg = "Failed at Line {}  Error: {}".format(tb.tb_lineno, e)

	finally:
		return (mxdid, result, status, msg)

 

 

0 Kudos
JoshuaBixby
MVP Esteemed Contributor

I have done a handful of Python multiprocessing projects using ArcPy and various geoprocessing tools, and one thing I learned early on is to not let your workers in the pool run indefinitely, i.e., set a limit to the number of tasks a worker can run and then refresh the pool with new workers.

From: multiprocessing — Process-based parallelism — Python 3.9.1 documentation

maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker processes will live as long as the pool.

For reasons I don't fully understand, and I doubt Esri will invest in addressing, geoprocessing tools that run over and over with a worker tend to slow down after time, and that slowness is worse than linear.  Destroying workers and creating new ones does have overhead, but I have found that overhead is much less than whatever slowness builds with the geoprocessing tools over time.

The trick is balancing how many tasks per worker before killing it.  I usually start with somewhere between 6 and 10.

Kara_Shindle
Occasional Contributor III

That's a thought - I had experimented with that when I was doing about 500 and didn't notice a difference, but I haven't tried it on a larger batch yet.  Will give it a go too.

0 Kudos