Select to view content in your preferred language

A Fun Threading Situation With da.Walk

734
12
Jump to solution
3 weeks ago
HaydenWelch
MVP Regular Contributor

So I have some library code that handles loading in file databases and indexing the contained data. There are then grouped out into dictionaries so I iteratively use da.Walk to extract each datatype from the dataset.

This can of course be pretty slow, especially when loading in a database that's on a network fileserver. No problem though, this is something that can be solved pretty simply by creating some threads using concurrent.futures.ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor, as_completed
from arcpy.da import Walk
from pathlib import Path

def walk(ds: str, dtype: str | None = None):
    """walk a dataset filtering on the supplied datatype"""
    paths: list[Path] = []
    for root, _, items in Walk(ds, datatype=dtype):
        for itm in items:
            paths.append(Path(root)/itm)
    return paths

def extract_types(ds: str, dtypes: list[str]) -> dict[str, list[Path]]:
    """Extract paths from a dataset grouped by type"""
    data: dict[str, list[Path]] = {}
    with ThreadPoolExecutor(max_workers=len(dtypes)) as executor:
        futures = {executor.submit(walk, ds, dtype): dtype for dtype in dtypes}
        for future in as_completed(futures):
            data[futures[future]] = future.result()
    return data

 

Seems simple enough, spool up one thread per Walk call and await the results so they can be done concurrently, lets run it:

>>> extract_types("My_GDB", ['FeatureClass', 'Table'])
{'FeatureClass': [], 'Table': []}

 

Hmmm. There's no output, but there is definitely both Tables and Feature Classes in that gdb... Let's try a syncronous extract method:

def extract_types_sync(ds: str, dtypes: list[str]) -> dict[str, list[Path]]:
    """Extract paths from a dataset grouped by type"""
    return {
        dtype: walk(ds, dtype)
        for dtype in dtypes
    }

 

And run that:

>>> extract_types_sync("My_GDB", ['FeatureClass', 'Table'])
{'FeatureClass': [Path("My_GDB/FC1"), Path("My_GDB/FC2")], 
'Table': [Path("My_GDB/Table1"), Path("My_GDB/Table2")]}

 

Okay, so there IS data in the database, and Walk is able to find it. Lets try the concurrent version again:

>>> extract_types("My_GDB", ['FeatureClass', 'Table'])
{'FeatureClass': [Path("My_GDB/FC1"), Path("My_GDB/FC2")], 
'Table': [Path("My_GDB/Table1"), Path("My_GDB/Table2")]}

 

So now the concurrent version is able to find the data,  but only **after** running a Walk syncronously? This little bug persists through interpreter sessions it seems. So let's see if warming up the Walk function can fix it:

def extract_types(ds: str, dtypes: list[str]) -> dict[str, list[Path]]:
    """Extract paths from a dataset grouped by type"""
    for _ in Walk(ds): break
    data: dict[str, list[Path]] = {}
    with ThreadPoolExecutor(max_workers=len(dtypes)) as executor:
        futures = {executor.submit(walk, ds, dtype): dtype for dtype in dtypes}
        for future in as_completed(futures):
            data[futures[future]] = future.result()
    return data

 

And run one more time:

>>> extract_types("My_GDB", ['FeatureClass', 'Table'])
{'FeatureClass': [Path("My_GDB/FC1"), Path("My_GDB/FC2")], 
'Table': [Path("My_GDB/Table1"), Path("My_GDB/Table2")]}

 

Now it works! This is really odd though. I'm guessing that da.Walk relies on some global state that isn't initialized in a sub thread and must be initialized in the main thread. This is definitely odd behavior though, and I figured that I'd share it here in case anyone else happens to run into it. I am also curious how this pattern will work when 3.14 is adopted and we have access to the InterpreterPoolExecutor. Will the arcpy global state need to be shared for functions as simple as da.Walk?

1 Solution

Accepted Solutions
JoshuaBixby
MVP Esteemed Contributor

OK, I think I finally understand (or mostly) what is going on with arcpy.da.Walk and multithreading.  As you know, I created some synthetic datasets to represent a range of GDB structure, and have been testing those datasets on local SSD, LAN SMB (~1.25 ms), and WAN SMB (~20 ms).

arcpy.da.Walk is an os.walk-shaped generator — one __next__() per directory — and it holds some in-process lock during each __next__() call, releasing between yields.  A probe driving two Walk generators from two threads (thread A, thread B) showed the two threads' iterations alternate perfectly in lockstep (B, A, B, A, ...) on a gdb with feature datasets, with ~80% wall-clock overlap.  So threads do get scheduled and do share the work, but they share it by taking turns at the lock, not by running truly in parallel.  That puts a ceiling on how much threading can help.  Total lock-holding time is bounded below by total work, so two threads can't beat one by much.

The practical consequence is that how much threading wins depends on your gdb's directory structure.  A flat gdb is one directory with one big yield, so two threads can't interleave at all.  This means the second thread just waits until the first is done.  A gdb with many feature datasets has many yield points and the two threads can rotate through the lock productively, which is probably where some of your time savings is coming from in your tests. In my testing, ProcessPoolExecutor gave a more predictable ~1.6-1.9× speedup at WAN latency regardless of gdb shape, because separate processes have separate arcpy state and don't share the lock at all. The tradeoff is the ~3s cost of spinning up each worker (fresh arcpy import, license check), which is fine if each gdb walk takes more than a few seconds but eats the benefit on fast walks. If you're walking many gdbs over a WAN, the biggest time savings is probably one process per gdb rather than threading within one gdb — that axis scales cleanly and doesn't depend on what any individual gdb looks like.

View solution in original post

12 Replies
JoshuaBixby
MVP Esteemed Contributor

Given Esri makes no statements about which modules, DLLs, etc... are thread safe, the safe bet is to assume ArcPy is not thread safe.  The initialization quirk you found using arcpy.da.Walk with a ThreadPoolExecutor is both an example and indicator that arcpy.da.Walk is not completely thread safe.  Although you did find a workaround to initialize arcpy.da.Walk, I am not sure the benefits of running multi-threading (if any) outweigh any risks of multi-threading non thread-safe code.

Have you tried doing sequential calls and compared performance?  The internal caching that the libraries behind arcpy.da.Walk use should result in fairly performant sequential calls.

HaydenWelch
MVP Regular Contributor

The limiting factor is the speed of the NAS. The parallel walks end up being okay since they're only reading. Technically there could be issues if the database being scanned is modified while the walks are executing, but that's a price I'm willing to pay since at the end of this all I have is a bunch of Path objects.

The previous sync version would walk a database in ~30 seconds, but this one is consistently between 12 and 15 seconds. With local indexing (not limited by network speed) being ~1-2 seconds which is the same as the sync version. The first call is slowest since we have to hydrate the Walk function, but subsequent calls are about 10% faster at least. I got around this by hydrating Walk when the module is imported so I don't have to pay that initialization cost every time I call the function.

I'm definitely going to keep an eye on it for the next few weeks, but for now It's been running smoothly.

Sidenote: something that IS threadsafe (as far as I can tell) is management.Compress and management.Compact, which is a massive speedup when archiving old databases depending on how many threads you want to spool up.

 

EDIT: Heres a screenshot of a Jupyter session where I used the concurrent walks to construct a Dataset object:

HaydenWelch_0-1776281449865.png

You can see that the first call is a bit slower than the subsequent calls, but it finds the same data every time. Anything that uses these objects knows what to expect, so if something is missing it'll throw an error telling me.

If you want to take a look or try it out, the library is open source and the code is available here

0 Kudos
JoshuaBixby
MVP Esteemed Contributor

I will have to check out the library on GH.  I don't fully understand what the screenshot is showing?  Are you just running the same code block in a loop using the same data set(s) and arguments?  If so, the file system caching done at the OS level and the caching happening in ArcGIS code is likely influencing the results.  For code like this, the cold call performance is what is important, right?  What types of latency are you dealing with to your network file shares?  I know your screenshot is for a test database or example database, but are these times slow enough to warrant the time to optimize an approach in the first place?

HaydenWelch
MVP Regular Contributor

When testing on our fileserver, this same database initialization went from 30s to 15s (we have thousands of file databases with ~100 feature classes and ~50 tables each). There is filesystem caching, but the actual Walk call itself is slow, and since I have to call it multiple times to get all the features initialized into the proper types, I had to pay that cost sequentially, now those 4 walks (FeatureClass, Table, Relationships, FeatureDatasets) are run concurrently.

I'm only using walk instead of the List* functions because Walk doesn't rely on the global arcpy.env state which can be finicky when you start changing workspaces rapidly.

The code I shared above has been implemented in the initializer of that Dataset object, so the sequential runs are just showing that it's not having issues missing things or losing data. The cold start is expected of course.

 

Addendum: This code is also in a Python toolbox and is frequently used to scan a database to populate tool parameters or validate parameters, and since tools are re-initialized every time you modify a parameter, the cold call is actually not the limiting factor. Getting average call time down saves a lot. I will still usually create a global cache for a tool, or use the @cached_property or @lru_cache decorators to prevent repeated loads on more expensive operations though.

0 Kudos
JoshuaBixby
MVP Esteemed Contributor

I know there is more to your repo than just getting lists of feature classes and tables from file geodatabases, but when it comes to getting lists of feature classes and tables from file geodatabases I am not seeing any benefit most of the time from using multiprocessing vs sequential calling regardless of network latency.  The code structure I have been comparing is (da.Walk initialization/workaround handled upstream from code snippet):

# ---------------------------------------------------------------------------
# Shared worker
# ---------------------------------------------------------------------------

def walk(ds: str, dtype: str | None = None) -> list[Path]:
    paths: list[Path] = []
    for root, dirnames, filenames in Walk(ds, datatype=dtype):
        for itm in entries:
            paths.append(Path(root) / itm)
    return paths


# ---------------------------------------------------------------------------
# The parallel and sequential dispatchers.  Keep these as structurally
# similar as possible.
# ---------------------------------------------------------------------------

def extract_types_threaded(ds: str, dtypes: list[str]) -> dict[str, list[Path]]:
    """One thread per datatype. Matches OP post."""
    data: dict[str, list[Path]] = {}
    with ThreadPoolExecutor(max_workers=len(dtypes)) as executor:
        futures = {executor.submit(walk, ds, dtype): dtype for dtype in dtypes}
        for future in as_completed(futures):
            data[futures[future]] = future.result()
    return data


def extract_types_sequential(ds: str, dtypes: list[str]) -> dict[str, list[Path]]:
    """One call per datatype, main thread only."""
    return {dtype: walk(ds, dtype) for dtype in dtypes}


If you are so inclined and have the time, I posted ArcPy: Generate FGDB Collection as a GitHub gist to create a collection of differently structured file geodatabases for synthetic testing.  I would be interested in what the timings look like for you getting feature class and table lists from these FGDBs on your network file share.  And what is the latency to your network file share from the client running the tests.

What a default run produces

Running generate_fgdb_collection.py with no arguments writes the collection to ./collection/ and builds every profile in the catalog. Each profile produces one .gdb directory plus a sibling .manifest.json for the correctness checker.

Per-profile breakdown

Profile FCs Tables FDs RCs Total Purpose
empty00000Zero-of-everything edge case
tiny42118Sanity, one of each type
flat_small20100030Isolates root enumeration
flat_medium1005005155Forum-thread baseline
nested_medium1005055160Tests FD recursion
wide_datasets1000200120Stresses FD count
deep_only4001041Single FD holds all
rc_heavy202005090Stresses RC enumeration
xl5002001030740Stress tier, largest workload
Total88433237911,344 

 

FCs counts all feature classes including those inside feature datasets, so nested profiles roll up. Total is the sum of FCs, tables, FDs, and RCs for that profile; it excludes domains (which Walk does not enumerate). Every FC and table gets five baseline fields (NAME, VALUE, CATEGORY, CREATED, FOREIGN_KEY) plus three deterministic-random extras. All objects are empty; Walk enumerates schema, not rows.

HaydenWelch
MVP Regular Contributor

I'll give this a shot tomorrow, it's very possible that our database schema or network latency is to blame, but I definitely got a speedup with the threaded version when targeting our databases (there's also ~450 attribute rules in each one which may be causing some weird slowdown).

If you don't mind, I might end up adding this gist to the repo testing area so I can use it for general performance benchmarking.

0 Kudos
JoshuaBixby
MVP Esteemed Contributor

Feel free to recycle the gist however you please.  There is something odd, odd in the sense that I can't explain it yet, with how da.Walk is working with multi-threading.  I spent several hours yesterday devising different tests, and I may be finally getting close to isolating why the multi-threaded da.Walk results aren't as robust as one would expect.  

HaydenWelch
MVP Regular Contributor

I just ran ~2hours of tests and ended up with some incredibly puzzling results. You are correct that on average, sequential reads are faster, but there seems to be some odd edge cases where the threaded version is SIGNIFICANTLY faster:

DatasetDataset Local SequentialDataset Local Threaded% Change
empty0.98s1.64s+67.24%
tiny5.51s3.69s-32.98%
flat_small3.43s4.04s+17.87%
flat_medium14.45s14.95s+3.39%
nested_medium10.74s12.30s+14.52%
wide_datasets62.70s22.75s-63.71%
deep_only1.66s2.59s+56.67%
rc_heavy5.48s5.14s-6.31%
xl85.45s66.31s-22.39%
populated_production14.40s12.34s-14.30%

 

DatasetDataset Network SequentialDataset Network Threaded% Change
empty4.54s8.20s+80.52%
tiny38.57s25.71s-33.34%
flat_small25.83s29.46s+14.04%
flat_medium109.82s116.67s+6.24%
nested_medium90.18s97.47s+8.09%
wide_datasets446.37s173.41s-61.15%
deep_only18.79s23.68s+26.03%
rc_heavy44.38s38.43s-13.40%
xl677.89s491.07s-27.56%
populated_production242.95s161.28s-33.62%

 

As you can see, our network is incredibly slow for some reason (the company IT department rolled out some RMM recently that may very well be causing all sorts of issues with IOPS).

All data was gathered using the python timeit module with 10 loops of each run. I added a flag to the Dataset constructor that switches the walk method between the threaded and sequential versions:

from pathlib import Path
from timeit import timeit
from arcpie import Dataset

LOCAL = Path().home() / 'collection'
NETWORK = Path(r"S:\Test\collection")

# production_populated gdb was added manually to both collections
local_datasets = {ds.name: ds for ds in LOCAL.glob('*.gdb')}
network_datasets = {ds.name: ds for ds in NETWORK.glob('*.gdb')}

tests = {
    'local threaded': 'Dataset(ds_path, _threaded=True)',
    'local sequential': 'Dataset(local_ds, _threaded=False)',
    'network threaded': 'Dataset(network_ds, _threaded=True)',
    'network sequential': 'Dataset(network_ds, _threaded=False)',
}

results = {}
number = 10

for ds_name, ds_path in local_datasets.items():
    local_ds = local_datasets[ds_name]
    network_ds = network_datasets[ds_name]
    # Warmup
    _ = timeit(stmt=f'Dataset(ds_path)', number=1, globals=globals())
    
    for name, test in tests.items():
        res = timeit(stmt=test, number=number, globals=globals())
        results[name] = res
        print(f'{ds_name} {name}: {res:0.2f}s')

 

EDIT: So I've tried another method that seems to work pretty well too. It is consistent across all environments and is always at least equal to whatever the fastest method is, I just read the a000000004.gdbtable file:

TAG_MAP = {
    'FeatureDataset': b'<DEFeatureDataset',
    'FeatureClass': b'<DEFeatureClassInfo',
    'Table': b'<DETableInfo',
    'RelationshipClass': b'<DERelationshipClassInfo',
}
PATH_TAGS = b'<CatalogPath>', b'</CatalogPath>'
# Currently NetworkDataset topologies are captured as FeatureClasses
# This can be used to filter them out, but it's not critical
TYPE_TAGS = b'<DatasetType>', b'</DatasetType>'
TABLE_FILE = 'a00000004.gdbtable'

# Read the raw table info from the a00000004 file
def _extract_types_a00000004(ds: Path | str, dtypes: list[str]) -> dict[str, list[Path]]:
    a4_table = Path(ds) / TABLE_FILE
    data: dict[str, list[Path]] = defaultdict(list)
    with a4_table.open('rb') as a4_file:
        for line in a4_file.readlines()[1:]:
            types_on_line: list[tuple[str, bytes]] = []
            for dtype in dtypes:
                if TAG_MAP[dtype] in line:
                    types_on_line.append((dtype, TAG_MAP[dtype]))     
            for dtype, open_tag in types_on_line:
                catalog_path = line.split(open_tag)[1].split(PATH_TAGS[0])[1].split(PATH_TAGS[1])[0].decode()[1:]
                if catalog_path.endswith('.gdb'):
                    continue
                data[dtype].append(Path(ds) / catalog_path)
        return data

 

This is a very rudimentary implementation that could use some optimization, but if the Walk function is a black box, why not just get the data straight from the source. No need to even parse the XML since each table entry is on one line. Just match the open tag.

 

Here's the timings on a local database:

DatasetDataset Local SequentialDataset Local ThreadedDataset Local Raw
empty0.06s0.13s0.02s
tiny0.16s0.21s0.05s
flat_small0.25s0.32s0.19s
flat_medium1.18s1.22s0.97s
nested_medium0.98s1.02s0.66s
wide_datasets1.39s1.23s0.65s
deep_only0.20s0.28s0.08s
rc_heavy0.58s0.54s0.36s
xl7.39s5.50s4.21s
populated_production3.20s2.35s2.14s

 

And with the network databases:

DatasetDataset Network SequentialDataset Network ThreadedDataset Network Raw
empty0.27s0.54s0.09s
tiny0.58s1.01s0.27s
flat_small1.53s1.76s1.13s
flat_medium6.09s6.79s5.22s
nested_medium4.84s5.60s3.57s
wide_datasets28.42s11.34s8.03s
deep_only2.84s1.88s1.01s
rc_heavy2.72s2.40s1.72s
xl39.19s30.27s24.11s
populated_production13.36s9.62s8.12s
0 Kudos
JoshuaBixby
MVP Esteemed Contributor

I have used some of the publicly available reverse-engineered FGDB specifications to do this exact same kind of thing.  I even created a locking mechanism that emulates what some SDK calls do to ensure I am playing nice within the FGDB when reading it files, but that is a different discussion thread.

Looking at your network dataset times, I suspect 2/3 to 3/4 would actually benefit from multi-processing instead of multi-threading.  The bottleneck with Walk isn't the FGDB API but something in either Walk itself or internal code that Walk depends upon.  It is perfectly fine to have multiple, even many, processes reading the same FGDB at the same time without issue.  Even though initializing an arcpy license does take a few seconds, if enumerating a data type in an network share FGDB takes tens of seconds, that 3-second hit from initializing a library isn't that big.