Select to view content in your preferred language

Multi-threading on a large point dataset

139
2
2 weeks ago
David_Brooks
MVP Regular Contributor

Is it possible to multi-thread the following python script? 

Background:

The script loops through a 3D point grid and assigns soil classification values to the points based on the presence of a list of soil-type raster datasets.

I have tried implementing pathos.multiprocessing, multiprocessing and concurrent.futures, but have failed due to limited knowledge of their capabilities.

Given that the point grid will eventually contain 1 million points, and the non-threaded tool runs at about 10 points per second, I would think there must be an efficient way to break up the point dataset into chunks based on a user defined number of cpu cores and have the function work on each chunk in parrallel and then rebuild the point datasets at the end?

import arcpy

# Parameters
point_fc = arcpy.GetParameterAsText(0)  # Input point feature class
raster_list = arcpy.GetParameterAsText(1).split(";")  # Semi-colon separated list of rasters
unit_field = "Unit"

# Ensure the 'Unit' field exists
if unit_field not in [field.name for field in arcpy.ListFields(point_fc)]:
    arcpy.AddField_management(point_fc, unit_field, "TEXT", field_length=50)

# Initialize classification counter
classified_count = 0

# Start editing session
with arcpy.da.UpdateCursor(point_fc, ["SHAPE@X", "SHAPE@Y", "SHAPE@Z", unit_field]) as cursor:
    for row in cursor:
        point_x = row[0]
        point_y = row[1]
        point_z = row[2]
        assigned_unit = "NoData"
        
        # Check each raster to find the first one below the point
        for raster_path in raster_list:
            raster_name = arcpy.Describe(raster_path).name
            try:
                # Get the raster cell value at the (X, Y) location
                raster_value = arcpy.GetCellValue_management(raster_path, f"{point_x} {point_y}")
                raster_value = raster_value.getOutput(0)
                
                # Skip if the raster value is 'NoData'
                if raster_value == "NoData":
                    continue
                
                # Convert to float and compare with point Z
                raster_value = float(raster_value)
                if point_z > raster_value:
                    assigned_unit = raster_name
                    classified_count += 1  # Increment counter for a valid classification
                    arcpy.AddMessage(f"Point classified as {assigned_unit} (Total classified: {classified_count})")
                    break  # Stop at the first valid raster below the point
            except Exception as e:
                arcpy.AddMessage(f"Skipping raster {raster_name}: {e}")
        
        # Update the Unit field
        row[3] = assigned_unit
        cursor.updateRow(row)

# Final message with total classified points
arcpy.AddMessage(f"Soil unit assignment complete. Total points classified: {classified_count}")

 


David
..Maps with no limits..
0 Kudos
2 Replies
DanPatterson
MVP Esteemed Contributor

have you looked at

Extract Multi Values to Points (Spatial Analyst)—ArcGIS Pro | Documentation

as an initial step since you seem to be processing multiple rasters using input points


... sort of retired...
HaydenWelch
MVP

I believe that DanPatterson's answer is probably the best way, using builtin functionality is always going to be your best bet. If you do need to try and parallelize your code, you could try something like this:

 

 

from multiprocessing import Manager, Process
from multiprocessing.managers import DictProxy
from pathlib import Path

from arcpy import AddError, AddMessage, GetParameterAsText, ListFields

from arcpy.management import GetCellValue, AddField
from arcpy.da import SearchCursor, UpdateCursor

def execute() -> None:
    # Parameters
    point_fc = GetParameterAsText(0)  # Input point feature class
    raster_list = GetParameterAsText(1).split(";")  # Semi-colon separated list of rasters
    unit_field = "Unit"
    point_fields = ["OID@", "SHAPE@X", "SHAPE@Y", "SHAPE@Z", unit_field]
    
    # Ensure the 'Unit' field exists
    if unit_field not in [field.name for field in ListFields(point_fc)]:
        AddField(point_fc, unit_field, "TEXT", field_length=50)

    # Cache the points in memory
    points = [row for row in SearchCursor(point_fc, point_fields)]

    # Initialize a shared memory dictionary for updates
    manager = Manager()
    updates: DictProxy = manager.dict()

    # Create a Generator pool of arguments for parallel processing
    # Using a generator here will save memory by not creating every
    # possible combination of arguments in memory at once.
    args_pool =\
    (
        row + [raster_path, updates]
        for row in points
        for raster_path in raster_list
    )

    # Create a pool of processes (Also uses a generator so processes are not created all at once)
    # Stolen from here: https://stackoverflow.com/questions/38393269/fill-up-a-dictionary-in-parallel-with-multiprocessing
    job = (Process(target=classify_point, args=args) for args in args_pool)
    
    # Start processes 
    # No need to capture the return values here since we are passing the
    # shared memory dictionary to each process and updating that in the function.
    _ = [p.start() for p in job]
    
    # Wait for all processes to finish
    _ = [p.join() for p in job]
    
    # Start editing session
    classified_count = 0
    with UpdateCursor(point_fc, point_fields) as cursor:
        for row in cursor:
            row = dict(zip(point_fields, row))
            if row['OID@'] in updates:
                cursor.updateRow(updates[row['OID@']])
                classified_count += 1

    # Final message with total classified points
    AddMessage(f"Soil unit assignment complete. Total points classified: {classified_count}")

# Pool on a single raster and point
def classify_point(point_id: int, point_x: float, point_y: float, point_z: float, unit_field: str, raster_path: Path, updates: dict) -> None:
    """Classify a point based on a raster value"""
    assigned_unit = "NoData"
    raster_name = Path(raster_path).stem
    try:
        # Get the raster cell value at the (X, Y) location
        raster_value, _ = GetCellValue(raster_path, f"{point_x} {point_y}")
    except Exception as e:
        AddError(f"Error reading raster {raster_name}: {e}")
        print(f"Error reading raster {raster_name}: {e}")
        return
    
    # Skip if the raster value is 'NoData'
    if raster_value == "NoData":
        return
    
    # Convert to float and compare with point Z
    raster_value = float(raster_value)
    if point_z > raster_value:
        assigned_unit = raster_name
    
    # Update the shared memory dictionary
    updates[point_id] = (point_id, point_x, point_y, point_z, assigned_unit)

 

 

This is untested, but the general flow should work. The trick is to pull your code out of the cursor context and do all your batch processing (per-raster) in memory, then apply those updates all at once. I'm not totally sure how well arcpy functions play with multiprocessing, but this should be a good starting place for you if you decide to go down this road.

For completeness, here's an example (also untested) that uses the ExtractMultiValuesToPoints function:

 

from pathlib import Path

from arcpy import GetParameterAsText, ListFields

from arcpy.management import AddField, CopyFeatures
from arcpy.sa import ExtractMultiValuesToPoints
from arcpy.da import UpdateCursor, SearchCursor

# Parameters
point_fc = GetParameterAsText(0)  # Input point feature class
raster_list = GetParameterAsText(1).split(";")  # Semi-colon separated list of rasters
unit_field = "Unit"
no_match_value = "NoMatch"

# Create a list of rasters and their corresponding field names
raster_mappings = [
    # Raster path     |     Raster name
    [str(Path(raster)), f"{Path(raster).stem}"] 
    for raster in raster_list
]

# Set up the cursor fields
cursor_fields = [unit_field] + [raster_name for _, raster_name in raster_mappings]

# Add the unit field if it doesn't exist
if unit_field not in [field.name for field in ListFields(point_fc)]:
    AddField(point_fc, unit_field, "TEXT", field_length=50)

# Operate on a copy of the data
point_copy = CopyFeatures(point_fc, "memory/point_copy")

# Extract the values from the rasters
ExtractMultiValuesToPoints(point_copy, raster_mappings, "NONE")

# filter out NoData and None/Null values
point_updates = [
    tuple(filter(lambda v: v[1] and not v[1] == 'NoData', zip(cursor_fields, row)))
    for row in SearchCursor(point_copy, cursor_fields)
]

# Update the original data (the two features are 1:1 so we can zip them together)
with UpdateCursor(point_fc, [unit_field]) as cursor:
    for _, match_rasters in zip(cursor, point_updates):
        
        # If no matching rasters are found, update the value to no_match_value
        if not match_rasters:
            cursor.updateRow([no_match_value])
            continue
        
        # Otherwise, update the value to the first matching raster
        raster_name, _ = match_rasters[0]
        cursor.updateRow([raster_name])

 

 

0 Kudos