Select to view content in your preferred language

Multi-Processing OD Matrix with arcpy.nax

354
4
Monday
ConorOBrien
New Contributor

Hi All,

I'm trying to automate a OD matrix process using arcpy.nax and multi-processing capabilities. I'm running into errors with how to set up the multi-processing capabilities without running into a BrokenProcessPool error.

I have set up unique job gdb to write the outputs into and set up a parameter list (shown below) for a ProcessPoolExecutor.


jobs = [{'Name': #Job Name, str value
'Network': #Str path to network dataset, is shared across jobs
'Type': #Type of Job (part of Name), plain str value
'Job Path': #Path of unique job geodatabase,
'Origins': #Path of origins FC in shared GDB across jobs,
'Destinations': #Path of destinations FC in shared GDB across jobs,
'Point Barrier': #Path of point barriers FC in shared GDB across jobs}]

Below is my code that keeps breaking, I have tried to put in print statements within the try/except loop but BrokenProcessPool occurs immedately.

def run_od(job):
    try:
        arcpy.na.MakeNetworkDatasetLayer(job['Network'], job["Name"])
        nd_travel_modes = arcpy.na.GetTravelModes(job["Name"])
        travel_mode = nd_travel_modes["TruckTravelTime"]

        odcm=arcpy.nax.OriginDestinationCostMatrix(job['Network'])
        odcm.travelMode = travel_mode
        odcm.lineShapeType = arcpy.nax.LineShapeType.NoLine

        pbarriers_FM=odcm.fieldMappings(arcpy.nax.OriginDestinationCostMatrixInputDataType.PointBarriers)
        pbarriers_FM['BarrierType'].mappedFieldName ='TYPE'
        pbarriers_FM['Additional_Time'].mappedFieldName ='WAIT'

        FM=field_mapping(odcm, job['Type'])

        odcm.load(arcpy.nax.OriginDestinationCostMatrixInputDataType.Origins, job['Origins'], FM[0])
        odcm.load(arcpy.nax.OriginDestinationCostMatrixInputDataType.Destinations, job['Destinations'], FM[1])
        odcm.load(arcpy.nax.OriginDestinationCostMatrixInputDataType.PointBarriers, job['Point Barrier'], pbarriers_FM)

        print(f"Solve OD {job['Name']}")
        result=odcm.solve()
        output_lines=os.path.join(job["Job Path"], f"OD_{job['Name']}")
        if arcpy.Exists(output_lines):
            arcpy.management.Delete(output_lines)
        print(f"Export OD to Lines")
        result.export(arcpy.nax.OriginDestinationCostMatrixOutputDataType.Lines, output_lines)
        return f"Success - {job['Name']}"
    except Exception as e:
        return f"Failed - {job['Name']}"

multiprocessing.set_start_method("spawn", force=True)
with ProcessPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(run_od, j) for j in jobs]
    for future in as_completed(futures):
        print(future.result())

Running the run_od function sequentially across jobs works, just the multiprocessing step raises error. I have not edited the env parrel processing factor, I do not know if this helps/hinder process with arcpy.

Any help on this would be greatly appreciated.

Thanks,

Conor

0 Kudos
4 Replies
HaydenWelch
MVP Regular Contributor

Multiprocessing can be a pain with arcpy because of the license checks that run when it's imported into an environment.

If you're only licensed for one seat, you can get license errors on subsequent processes.

Would you mind sharing the error message?

0 Kudos
ConorOBrien
New Contributor

Hi Hayden,
Sure thing, see error message below. I get it immediately after hitting run on the code block.

Traceback (most recent call last):
  File "C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\Lib\multiprocessing\queues.py", line 246, in _feed
    send_bytes(obj)
  File "C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\Lib\multiprocessing\connection.py", line 184, in send_bytes
    self._check_closed()
  File "C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\Lib\multiprocessing\connection.py", line 137, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed

---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
Cell In[86], line 5
      3 futures = [executor.submit(run_od, j) for j in jobs]
      4 for future in as_completed(futures):
----> 5     print(future.result())

File C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\Lib\concurrent\futures\_base.py:449, in Future.result(self, timeout)
    447     raise CancelledError()
    448 elif self._state == FINISHED:
--> 449     return self.__get_result()
    451 self._condition.wait(timeout)
    453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\Lib\concurrent\futures\_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

 

HaydenWelch
MVP Regular Contributor

Try re-raising the Exception you're catching in the run function. The way you currently have it written, the actual exception is being dropped.

I have a suspicion that the silenced exception is a RuntimeError thats being thrown out because of the license. Only because I ran into this exact same issue the other day lol.

Not totally sure if there's a guaranteed workaround, but you could try bringing the license offline, adding a warmup delay so the process has time to aquire the license after the other drops the lock, or change your license to a pooled/multiuser if available through the Pro licensing interface.

 

Sidenote: Try not to shadow your whole script in a try/except. It'll make debugging a pain since you can hide the exception and, in this case, put the program in a bad state that raises an error later (process hits a RuntimeError and sys.exit(1)s, but you returned a value so the wrapper process tries to communicate with something that died silently.

Don't be afraid to allow the function to fail fast, then catch the exception somewhere else. I believe multiprocessing and asyncio both allow you to return exceptions from tasks/processes and handle them later.

DanPatterson
MVP Esteemed Contributor

Untried by myself, but are you aware of the network analyst resources in

Esri/large-network-analysis-tools: Tools and code samples for solving large network analysis problem...

there is a section on Solve Large OD Cost Matrix tool which may be of interest


... sort of retired...