Select to view content in your preferred language

A new way of interacting with FeatureClasses

179
1
3 weeks ago
Status: Open
HaydenWelch
Occasional Contributor II

I've been working on a module that wraps common arcpy operations with Arcpy objects and makes interacting with them more "Pythonic". Basically meaning I wrote a bunch of 'dunder' methods and error handling that allows you to initialize an object with a path to a Shapefile or FeatureClass and then get a bunch of best practice interactions with that data without having to use boilerplate:

HaydenWelch_1-1718760396526.png

In this example: 

  1. Initialization with datapath
  2. Built in update cursor used to set one field equal to another
  3. Field added to flag records as "Over 5 Acres"
  4. The "OVER5ACRE" field is defaulted to 0
  5. A Query is applied to the object
  6. All visible records (queried above) have their "GIS5ACRE" field set to 1 (True)
  7. Query is removed
  8. A list of all values for "GIS5ACRE" in the table is summed (this is equivalent to using len() when the query is active)
  9. The length of the unqueried data is shown to compare

This is just a simple toy example. I can write up a more detailed explanation, but for now I just wanted to test the waters on this as a concept. Having the community help with maintaining an interface like this would mean we could bake a lot of the more convoluted Arcpy methodology into simpler syntax that more regular Python devs would be familiar with.

 

If anyone wants to peek at the code here it is:

 

import arcpy
import os
from typing import Any, Generator

from archelp import message

ALL_FIELDS = object()

class DescribeModel:
    """ Base object for models """
        
    def __init__(self, path: os.PathLike):
        self.path = path
        desc = self._validate()
                
        # Set General Properties
        self.describe: Any = desc
        self.basename: str = desc.baseName
        self.workspace: os.PathLike = desc.path
        self.name: str = desc.name
        self.catalogpath: os.PathLike = desc.catalogPath
        self.extension: str = desc.extension
        self.type: str = desc.dataType
        self.data: dict[int, dict[str, Any]] = None
        return
    
    def update(self):
        """ Update the Describe object """
        updates = self.__class__(self.path)
        # Preserve a custom OIDField
        if hasattr(updates, "OIDField") and self.OIDField != updates.OIDField:
            updates.OIDField = self.OIDField
        self.__dict__.update(updates.__dict__)
        return
    
    def _validate(self):
        """ Validate the featurepath and return the Describe object """
        if arcpy.Exists(self.path):
            # Get Describe object
            desc = arcpy.Describe(self.path)
            # Raise TypeError if the datatype is not the expected datatype
            if desc.dataType != type(self).__name__ and type(self).__name__ != "Describe Model":
                raise TypeError(f"{desc.baseName} is {desc.dataType}, must be {type(self).__name__}")
            return desc
        
        raise FileNotFoundError(f"{self.path} does not exist")
    
    def __repr__(self):
        return f"<{type(self).__name__}: {self.basename} @ {hex(id(self))}>"
    
    def __str__(self):
        return f"{type(self).__name__}: {self.basename}"

class Table(DescribeModel):
    """ Wrapper for basic Table operations """
    def __init__(self, tablepath: os.PathLike):
        super().__init__(tablepath)
        
        self._query: str = None
        self.fields: list[arcpy.Field] = arcpy.ListFields(self.path)
        self.fieldnames: list[str] = [field.name for field in self.fields]
        if hasattr(self.describe, "shapeFieldName"):
            shape_idx = self.fieldnames.index(self.describe.shapeFieldName)
            self.fieldnames[shape_idx] = "SHAPE@"   # Default to include full shape
        self.indexes: list[arcpy.Index] = self.describe.indexes
        self.OIDField: str = self.describe.OIDFieldName
        self.cursor_tokens: list[str] = \
            [
                "CREATED@",
                "CREATOR@",
                "EDITED@",
                "EDITOR@",
                "GLOBALID@",
                "OID@",
                "SUBTYPE@",
                "*"
            ]
        self.queried: bool = False
        self._queried_count: int = 0
        return
    
    def _validate_fields(self, fields: list[str]) -> bool:
        """ Validate field list for cursors """
        if fields != ["*"] and not all([field in self.fieldnames + self.cursor_tokens for field in fields]):
            return False
        return True
    
    @property
    def row_template(self) -> dict[str, Any]:
        """ Get a template for a row """
        return dict(zip(self.fieldnames, [None for _ in range(len(self.fieldnames))]))
    
    @property
    def query(self) -> str:
        """ Get the query string """
        return self._query
    
    @query.setter
    def query(self, query_string: str):
        """ Set the query string """
        if not query_string:
            del self.query
        try:
            with arcpy.da.SearchCursor(self.path, [self.OIDField], where_clause=query_string) as cur:
                cur.fields
        except Exception as e:
            message(f"Invalid query string ({query_string})", "warning")
        self._query = query_string
        self.data = [row for row in self]
        self._queried_count = len(self.data)
        self.queried = True
        return
    
    @query.deleter
    def query(self):
        """ Delete the query string """
        self._query = None
        self.queried = False
        self.data = None
        return
    
    def set_oid_field(self, oid_field: str) -> None:
        if oid_field in self.fieldnames:
            self.OIDField = oid_field
    
    def get_rows(self, fields: list[str] = ALL_FIELDS, as_dict: bool = False, **kwargs) -> Generator[list | dict, None, None]:
        """ Get rows from a table 
        fields: list of fields to return
        as_dict: return rows as dict if True
        kwargs: See arcpy.da.SearchCursor for kwargs
        """
        if fields == ALL_FIELDS or fields == ["*"]:
            fields = self.fieldnames
        if not self._validate_fields(fields):
            raise ValueError(f"Fields must be in {self.fieldnames + self.cursor_tokens}")
        
        query = self._query
        if 'where_clause' in kwargs:
            query = kwargs['where_clause']
            del kwargs['where_clause']
        
        with arcpy.da.SearchCursor(self.path, fields, where_clause=query, **kwargs) as cursor:
            for row in cursor:
                if as_dict:
                    yield dict(zip(fields, row))
                else:
                    if len(fields) == 1: yield row[0]
                    else: yield row
                    
    def update_rows(self, key: str, values: dict[Any, dict[str, Any]], **kwargs) -> int:
        """ Update rows in table
        key: field to use as key for update (must be in fields)
        values: dict of key value pairs to update [fieldname/token: value]
        kwargs: See arcpy.da.UpdateCursor for kwargs
        return: number of rows updated
        """
        if not values:
            raise ValueError("Values must be provided")
        fields = list(list(values.values())[0].keys())
        if not self._validate_fields(fields):
            raise ValueError(f"Fields must be in {self.fieldnames + self.cursor_tokens}")
        for val in values.values():
            if list(val.keys()) != fields:
                raise ValueError(f"Fields must match for all values in values dict")
        
        query = self._query
        if 'where_clause' in kwargs:
            query = kwargs['where_clause']
            del kwargs['where_clause']
        
        update_count = 0
        with arcpy.da.UpdateCursor(self.path, fields, where_clause=query, **kwargs) as cursor:
            for row in cursor:
                row_dict = dict(zip(fields, row))
                if row_dict[key] in values.keys():
                    cursor.updateRow(list(values[row_dict[key]].values()))
                    update_count += 1
        return update_count
    
    def _cursor(self, cur_type: str, fields: list[str]=ALL_FIELDS, **kwargs) -> arcpy.da.UpdateCursor | arcpy.da.SearchCursor | arcpy.da.InsertCursor:
        """ Internal cursor method to get cursor type
        """
        if fields is ALL_FIELDS or fields == ["*"]:
            fields = self.fieldnames
        if not self._validate_fields(fields):
            raise ValueError(f"Fields must be in {self.fieldnames + self.cursor_tokens}")
        if cur_type == "update":
            return arcpy.da.UpdateCursor(self.path, fields, where_clause=self.query, **kwargs)
        if cur_type == "search":
            return arcpy.da.SearchCursor(self.path, fields, where_clause=self.query, **kwargs)
        if cur_type == "insert":
            return arcpy.da.InsertCursor(self.path, fields, **kwargs)
        raise ValueError(f"Invalid cursor type {cur_type}")
    
    def update_cursor(self, fields: list[str]=ALL_FIELDS, **kwargs) -> arcpy.da.UpdateCursor:
        """ Get an update cursor for the table
        fields: list of fields to update
        kwargs: See arcpy.da.UpdateCursor for kwargs
        return: update cursor
        """
        return self._cursor("update", fields, **kwargs)
    
    def search_cursor(self, fields: list[str]=ALL_FIELDS, **kwargs) -> arcpy.da.SearchCursor:
        """ Get a search cursor for the table
        fields: list of fields to return
        kwargs: See arcpy.da.SearchCursor for kwargs
        return: search cursor
        """
        self._cursor("search", fields, **kwargs)
    
    def insert_cursor(self, fields: list[str]=ALL_FIELDS, **kwargs) -> arcpy.da.InsertCursor:
        """ Get an insert cursor for the table
        fields: list of fields to insert
        kwargs: See arcpy.da.InsertCursor for kwargs
        return: insert cursor
        """
        self._cursor("insert", fields, **kwargs)
    
    def insert_rows(self, values: list[dict[Any, dict[str, Any]]], **kwargs) -> int:
        """ Insert rows into table
        fields: list of fields to insert
        values: list of dicts with key value pairs to insert [field: value]
        return: number of rows inserted
        """
        if not values:
            raise ValueError("Values must be provided")
        fields = list(values[0].keys())
        if not self._validate_fields(fields):
            raise ValueError(f"Fields must be in {self.fieldnames + self.cursor_tokens}")
        
        insert_count = 0
        with arcpy.da.InsertCursor(self.path, fields, **kwargs) as cursor:
            for value in values:
                cursor.insertRow([value[field] for field in fields])
                insert_count += 1
        return insert_count
    
    def delete_rows(self, key: str, values: list[Any], **kwargs) -> int:
        """ Delete rows from table
        key: field to use as key for delete (must be in fields)
        values: list of values to delete
        kwargs: See arcpy.da.UpdateCursor for kwargs
        return: number of rows deleted
        """
        if not values:
            raise ValueError("Values must be provided")
        delete_count = 0
        with arcpy.da.UpdateCursor(self.path, [key], **kwargs) as cursor:
            for row in cursor:
                if row[0] in values:
                    cursor.deleteRow()
                    delete_count += 1
        return delete_count
    
    def add_field(self, field_name: str, **kwargs) -> None:
        """ Add a field to the table 
        field_name: name of the field to add
        kwargs: See arcpy.management.AddField for kwargs
        """
        arcpy.management.AddField(self.path, field_name, **kwargs)
        self.update()
        return
    
    def delete_fields(self, field_names: list[str]) -> None:
        """ Delete a field from the table 
        field_name: name of the field to delete
        """
        arcpy.management.DeleteField(self.path, field_names)
        self.update()
        return
    
    def __iter__(self):
        yield from self.get_rows(self.fieldnames, as_dict=True)
        
    def __len__(self):
        if self.queried:
            return self._queried_count
        return int(arcpy.management.GetCount(self.path).getOutput(0))
    
    def __getitem__(self, idx: int | str):
        
        if isinstance(idx, int):
            return self.get_rows(as_dict=True, where_clause=f"{self.OIDField} = {idx}").__next__()
        
        if isinstance(idx, str) and idx in self.fieldnames + self.cursor_tokens:
            return [v for v in self.get_rows([idx])]
        
        if isinstance(idx, list) and all([isinstance(f, str) for f in idx]):
            if set(idx).issubset(self.fieldnames + self.cursor_tokens):
                return [v for v in self.get_rows(idx, as_dict=True)]
            raise KeyError(f"{idx} not in {self.fieldnames + self.cursor_tokens}")
            
        if isinstance(idx, list) and all([isinstance(f, int) for f in idx]):
            return [v for v in self.get_rows(where_clause=f"{self.OIDField} IN {tuple(idx)}", as_dict=True)]
        
        raise ValueError(f"{idx} is invalid, either pass field, OID, or list of fields or lis of OIDs")
    
    def __setitem__(self, idx: int | str | list, values: list | Any):
        
        if isinstance(idx, int) and len(values) == len(self.fieldnames):
            self.update_rows(self.OIDField, {idx: dict(zip(self.fieldnames, values))})
            return
        
        if isinstance(idx, str) and idx in self.fieldnames + self.cursor_tokens:
            with arcpy.da.UpdateCursor(self.path, [idx], where_clause=self.query) as cursor:
                for row in cursor:
                    row[0] = values
                    cursor.updateRow(row)
            return
        
        if isinstance(idx, list) and set(idx).issubset(set(self.fieldnames + self.cursor_tokens)):
            with arcpy.da.UpdateCursor(self.path, idx, where_clause=self.query) as cursor:
                for row in cursor:
                    cursor.updateRow(values)
            return
        
        raise ValueError(f"{idx} not in {self.fieldnames + self.cursor_tokens} or index is out of range")
    
    def __delitem__(self, idx: int | str | list[str]):
        if isinstance(idx, int):
            self.delete_rows(self.OIDField, [idx])
            return
        
        if isinstance(idx, str) and idx in self.fieldnames:
            self.delete_fields([idx])
            self.update()
            return
        
        if isinstance(idx, list) and all(isinstance(field, str) for field in idx):
            if all([field in self.fieldnames for field in idx]):
                self.delete_fields(idx)
                return
            else:
                raise ValueError(f"{idx} not subset of {self.fieldnames}")
            
        if isinstance(idx, list) and all(isinstance(field, int) for field in idx):
            self.delete_rows(self.OIDField, idx)
            return
        raise KeyError(f"{idx} not in {self.fieldnames + self.cursor_tokens}")

class FeatureClass(Table):
    """ Wrapper for basic FeatureClass operations """    
    def __init__(self, shppath: os.PathLike):
        super().__init__(shppath)
        self.spatialReference: arcpy.SpatialReference = self.describe.spatialReference
        self.shapeType: str = self.describe.shapeType
        self.cursor_tokens.extend(
            [
                "SHAPE@XY",
                "SHAPE@TRUECENTROID",
                "SHAPE@X",
                "SHAPE@Y",
                "SHAPE@Z",
                "SHAPE@M",
                "SHAPE@JSON",
                "SHAPE@WKB",
                "SHAPE@WKT",
                "SHAPE@",
                "SHAPE@AREA",
                "SHAPE@LENGTH",
            ]
        )
      
class ShapeFile(FeatureClass):
    """ Wraper for basic Shapefile operations"""

 

 

1 Comment
HaydenWelch

Here's another fun example I came up with while testing out using alternate OID fields as object indexes:

HaydenWelch_1-1718770460902.png

At the end of the day it's basically always going to be faster to write out all the boilerplate like you're supposed to, but being able to manipulate FeatureClasses as if they're python objects can sometimes be worth a small time hit. The Second example is actually functionally identical to the first, but there's a small time hit because I'm still pulling the full Shape object instead of using Shape@XY that is returned by the ['*'] wildcard in cursors.

The last one is definitely an interesting simplification (in terms of code readability and not having to worry about intermediate variables). I was able to build the fieldvalue sets in place by iterating over the field names then in that same loop immediately print out if that field was unique or not.

 

The last implementation is also the most process intensive as it creates len(fieldnames) Cursor objects and has to interface with all the error handling and data passthroughs implemented in the Table.get_rows() function, but then again the second example also has to deal with that so the time suck for 3 is for sure in the initialization and deletion of 20+ cursors for each field check.