Hi All,
I'm trying to automate a OD matrix process using arcpy.nax and multi-processing capabilities. I'm running into errors with how to set up the multi-processing capabilities without running into a BrokenProcessPool error.
I have set up unique job gdb to write the outputs into and set up a parameter list (shown below) for a ProcessPoolExecutor.
jobs = [{'Name': #Job Name, str value
'Network': #Str path to network dataset, is shared across jobs
'Type': #Type of Job (part of Name), plain str value
'Job Path': #Path of unique job geodatabase,
'Origins': #Path of origins FC in shared GDB across jobs,
'Destinations': #Path of destinations FC in shared GDB across jobs,
'Point Barrier': #Path of point barriers FC in shared GDB across jobs}]
Below is my code that keeps breaking, I have tried to put in print statements within the try/except loop but BrokenProcessPool occurs immedately.
def run_od(job):
try:
arcpy.na.MakeNetworkDatasetLayer(job['Network'], job["Name"])
nd_travel_modes = arcpy.na.GetTravelModes(job["Name"])
travel_mode = nd_travel_modes["TruckTravelTime"]
odcm=arcpy.nax.OriginDestinationCostMatrix(job['Network'])
odcm.travelMode = travel_mode
odcm.lineShapeType = arcpy.nax.LineShapeType.NoLine
pbarriers_FM=odcm.fieldMappings(arcpy.nax.OriginDestinationCostMatrixInputDataType.PointBarriers)
pbarriers_FM['BarrierType'].mappedFieldName ='TYPE'
pbarriers_FM['Additional_Time'].mappedFieldName ='WAIT'
FM=field_mapping(odcm, job['Type'])
odcm.load(arcpy.nax.OriginDestinationCostMatrixInputDataType.Origins, job['Origins'], FM[0])
odcm.load(arcpy.nax.OriginDestinationCostMatrixInputDataType.Destinations, job['Destinations'], FM[1])
odcm.load(arcpy.nax.OriginDestinationCostMatrixInputDataType.PointBarriers, job['Point Barrier'], pbarriers_FM)
print(f"Solve OD {job['Name']}")
result=odcm.solve()
output_lines=os.path.join(job["Job Path"], f"OD_{job['Name']}")
if arcpy.Exists(output_lines):
arcpy.management.Delete(output_lines)
print(f"Export OD to Lines")
result.export(arcpy.nax.OriginDestinationCostMatrixOutputDataType.Lines, output_lines)
return f"Success - {job['Name']}"
except Exception as e:
return f"Failed - {job['Name']}"
multiprocessing.set_start_method("spawn", force=True)
with ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(run_od, j) for j in jobs]
for future in as_completed(futures):
print(future.result())
Running the run_od function sequentially across jobs works, just the multiprocessing step raises error. I have not edited the env parrel processing factor, I do not know if this helps/hinder process with arcpy.
Any help on this would be greatly appreciated.
Thanks,
Conor
Multiprocessing can be a pain with arcpy because of the license checks that run when it's imported into an environment.
If you're only licensed for one seat, you can get license errors on subsequent processes.
Would you mind sharing the error message?
Hi Hayden,
Sure thing, see error message below. I get it immediately after hitting run on the code block.
Traceback (most recent call last):
File "C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\Lib\multiprocessing\queues.py", line 246, in _feed
send_bytes(obj)
File "C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\Lib\multiprocessing\connection.py", line 184, in send_bytes
self._check_closed()
File "C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\Lib\multiprocessing\connection.py", line 137, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
---------------------------------------------------------------------------
BrokenProcessPool Traceback (most recent call last)
Cell In[86], line 5
3 futures = [executor.submit(run_od, j) for j in jobs]
4 for future in as_completed(futures):
----> 5 print(future.result())
File C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\Lib\concurrent\futures\_base.py:449, in Future.result(self, timeout)
447 raise CancelledError()
448 elif self._state == FINISHED:
--> 449 return self.__get_result()
451 self._condition.wait(timeout)
453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
File C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\Lib\concurrent\futures\_base.py:401, in Future.__get_result(self)
399 if self._exception:
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
404 self = None
BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Try re-raising the Exception you're catching in the run function. The way you currently have it written, the actual exception is being dropped.
I have a suspicion that the silenced exception is a RuntimeError thats being thrown out because of the license. Only because I ran into this exact same issue the other day lol.
Not totally sure if there's a guaranteed workaround, but you could try bringing the license offline, adding a warmup delay so the process has time to aquire the license after the other drops the lock, or change your license to a pooled/multiuser if available through the Pro licensing interface.
Sidenote: Try not to shadow your whole script in a try/except. It'll make debugging a pain since you can hide the exception and, in this case, put the program in a bad state that raises an error later (process hits a RuntimeError and sys.exit(1)s, but you returned a value so the wrapper process tries to communicate with something that died silently.
Don't be afraid to allow the function to fail fast, then catch the exception somewhere else. I believe multiprocessing and asyncio both allow you to return exceptions from tasks/processes and handle them later.
Untried by myself, but are you aware of the network analyst resources in
there is a section on Solve Large OD Cost Matrix tool which may be of interest