I've been working on a module that wraps common arcpy operations with Arcpy objects and makes interacting with them more "Pythonic". Basically meaning I wrote a bunch of 'dunder' methods and error handling that allows you to initialize an object with a path to a Shapefile or FeatureClass and then get a bunch of best practice interactions with that data without having to use boilerplate:
In this example:
This is just a simple toy example. I can write up a more detailed explanation, but for now I just wanted to test the waters on this as a concept. Having the community help with maintaining an interface like this would mean we could bake a lot of the more convoluted Arcpy methodology into simpler syntax that more regular Python devs would be familiar with.
If anyone wants to peek at the code here it is:
import arcpy
import os
from typing import Any, Generator
from archelp import message
ALL_FIELDS = object()
class DescribeModel:
""" Base object for models """
def __init__(self, path: os.PathLike):
self.path = path
desc = self._validate()
# Set General Properties
self.describe: Any = desc
self.basename: str = desc.baseName
self.workspace: os.PathLike = desc.path
self.name: str = desc.name
self.catalogpath: os.PathLike = desc.catalogPath
self.extension: str = desc.extension
self.type: str = desc.dataType
self.data: dict[int, dict[str, Any]] = None
return
def update(self):
""" Update the Describe object """
updates = self.__class__(self.path)
# Preserve a custom OIDField
if hasattr(updates, "OIDField") and self.OIDField != updates.OIDField:
updates.OIDField = self.OIDField
self.__dict__.update(updates.__dict__)
return
def _validate(self):
""" Validate the featurepath and return the Describe object """
if arcpy.Exists(self.path):
# Get Describe object
desc = arcpy.Describe(self.path)
# Raise TypeError if the datatype is not the expected datatype
if desc.dataType != type(self).__name__ and type(self).__name__ != "Describe Model":
raise TypeError(f"{desc.baseName} is {desc.dataType}, must be {type(self).__name__}")
return desc
raise FileNotFoundError(f"{self.path} does not exist")
def __repr__(self):
return f"<{type(self).__name__}: {self.basename} @ {hex(id(self))}>"
def __str__(self):
return f"{type(self).__name__}: {self.basename}"
class Table(DescribeModel):
""" Wrapper for basic Table operations """
def __init__(self, tablepath: os.PathLike):
super().__init__(tablepath)
self._query: str = None
self.fields: list[arcpy.Field] = arcpy.ListFields(self.path)
self.fieldnames: list[str] = [field.name for field in self.fields]
if hasattr(self.describe, "shapeFieldName"):
shape_idx = self.fieldnames.index(self.describe.shapeFieldName)
self.fieldnames[shape_idx] = "SHAPE@" # Default to include full shape
self.indexes: list[arcpy.Index] = self.describe.indexes
self.OIDField: str = self.describe.OIDFieldName
self.cursor_tokens: list[str] = \
[
"CREATED@",
"CREATOR@",
"EDITED@",
"EDITOR@",
"GLOBALID@",
"OID@",
"SUBTYPE@",
"*"
]
self.queried: bool = False
self._queried_count: int = 0
return
def _validate_fields(self, fields: list[str]) -> bool:
""" Validate field list for cursors """
if fields != ["*"] and not all([field in self.fieldnames + self.cursor_tokens for field in fields]):
return False
return True
@property
def row_template(self) -> dict[str, Any]:
""" Get a template for a row """
return dict(zip(self.fieldnames, [None for _ in range(len(self.fieldnames))]))
@property
def query(self) -> str:
""" Get the query string """
return self._query
@query.setter
def query(self, query_string: str):
""" Set the query string """
if not query_string:
del self.query
try:
with arcpy.da.SearchCursor(self.path, [self.OIDField], where_clause=query_string) as cur:
cur.fields
except Exception as e:
message(f"Invalid query string ({query_string})", "warning")
self._query = query_string
self.data = [row for row in self]
self._queried_count = len(self.data)
self.queried = True
return
@query.deleter
def query(self):
""" Delete the query string """
self._query = None
self.queried = False
self.data = None
return
def set_oid_field(self, oid_field: str) -> None:
if oid_field in self.fieldnames:
self.OIDField = oid_field
def get_rows(self, fields: list[str] = ALL_FIELDS, as_dict: bool = False, **kwargs) -> Generator[list | dict, None, None]:
""" Get rows from a table
fields: list of fields to return
as_dict: return rows as dict if True
kwargs: See arcpy.da.SearchCursor for kwargs
"""
if fields == ALL_FIELDS or fields == ["*"]:
fields = self.fieldnames
if not self._validate_fields(fields):
raise ValueError(f"Fields must be in {self.fieldnames + self.cursor_tokens}")
query = self._query
if 'where_clause' in kwargs:
query = kwargs['where_clause']
del kwargs['where_clause']
with arcpy.da.SearchCursor(self.path, fields, where_clause=query, **kwargs) as cursor:
for row in cursor:
if as_dict:
yield dict(zip(fields, row))
else:
if len(fields) == 1: yield row[0]
else: yield row
def update_rows(self, key: str, values: dict[Any, dict[str, Any]], **kwargs) -> int:
""" Update rows in table
key: field to use as key for update (must be in fields)
values: dict of key value pairs to update [fieldname/token: value]
kwargs: See arcpy.da.UpdateCursor for kwargs
return: number of rows updated
"""
if not values:
raise ValueError("Values must be provided")
fields = list(list(values.values())[0].keys())
if not self._validate_fields(fields):
raise ValueError(f"Fields must be in {self.fieldnames + self.cursor_tokens}")
for val in values.values():
if list(val.keys()) != fields:
raise ValueError(f"Fields must match for all values in values dict")
query = self._query
if 'where_clause' in kwargs:
query = kwargs['where_clause']
del kwargs['where_clause']
update_count = 0
with arcpy.da.UpdateCursor(self.path, fields, where_clause=query, **kwargs) as cursor:
for row in cursor:
row_dict = dict(zip(fields, row))
if row_dict[key] in values.keys():
cursor.updateRow(list(values[row_dict[key]].values()))
update_count += 1
return update_count
def _cursor(self, cur_type: str, fields: list[str]=ALL_FIELDS, **kwargs) -> arcpy.da.UpdateCursor | arcpy.da.SearchCursor | arcpy.da.InsertCursor:
""" Internal cursor method to get cursor type
"""
if fields is ALL_FIELDS or fields == ["*"]:
fields = self.fieldnames
if not self._validate_fields(fields):
raise ValueError(f"Fields must be in {self.fieldnames + self.cursor_tokens}")
if cur_type == "update":
return arcpy.da.UpdateCursor(self.path, fields, where_clause=self.query, **kwargs)
if cur_type == "search":
return arcpy.da.SearchCursor(self.path, fields, where_clause=self.query, **kwargs)
if cur_type == "insert":
return arcpy.da.InsertCursor(self.path, fields, **kwargs)
raise ValueError(f"Invalid cursor type {cur_type}")
def update_cursor(self, fields: list[str]=ALL_FIELDS, **kwargs) -> arcpy.da.UpdateCursor:
""" Get an update cursor for the table
fields: list of fields to update
kwargs: See arcpy.da.UpdateCursor for kwargs
return: update cursor
"""
return self._cursor("update", fields, **kwargs)
def search_cursor(self, fields: list[str]=ALL_FIELDS, **kwargs) -> arcpy.da.SearchCursor:
""" Get a search cursor for the table
fields: list of fields to return
kwargs: See arcpy.da.SearchCursor for kwargs
return: search cursor
"""
self._cursor("search", fields, **kwargs)
def insert_cursor(self, fields: list[str]=ALL_FIELDS, **kwargs) -> arcpy.da.InsertCursor:
""" Get an insert cursor for the table
fields: list of fields to insert
kwargs: See arcpy.da.InsertCursor for kwargs
return: insert cursor
"""
self._cursor("insert", fields, **kwargs)
def insert_rows(self, values: list[dict[Any, dict[str, Any]]], **kwargs) -> int:
""" Insert rows into table
fields: list of fields to insert
values: list of dicts with key value pairs to insert [field: value]
return: number of rows inserted
"""
if not values:
raise ValueError("Values must be provided")
fields = list(values[0].keys())
if not self._validate_fields(fields):
raise ValueError(f"Fields must be in {self.fieldnames + self.cursor_tokens}")
insert_count = 0
with arcpy.da.InsertCursor(self.path, fields, **kwargs) as cursor:
for value in values:
cursor.insertRow([value[field] for field in fields])
insert_count += 1
return insert_count
def delete_rows(self, key: str, values: list[Any], **kwargs) -> int:
""" Delete rows from table
key: field to use as key for delete (must be in fields)
values: list of values to delete
kwargs: See arcpy.da.UpdateCursor for kwargs
return: number of rows deleted
"""
if not values:
raise ValueError("Values must be provided")
delete_count = 0
with arcpy.da.UpdateCursor(self.path, [key], **kwargs) as cursor:
for row in cursor:
if row[0] in values:
cursor.deleteRow()
delete_count += 1
return delete_count
def add_field(self, field_name: str, **kwargs) -> None:
""" Add a field to the table
field_name: name of the field to add
kwargs: See arcpy.management.AddField for kwargs
"""
arcpy.management.AddField(self.path, field_name, **kwargs)
self.update()
return
def delete_fields(self, field_names: list[str]) -> None:
""" Delete a field from the table
field_name: name of the field to delete
"""
arcpy.management.DeleteField(self.path, field_names)
self.update()
return
def __iter__(self):
yield from self.get_rows(self.fieldnames, as_dict=True)
def __len__(self):
if self.queried:
return self._queried_count
return int(arcpy.management.GetCount(self.path).getOutput(0))
def __getitem__(self, idx: int | str):
if isinstance(idx, int):
return self.get_rows(as_dict=True, where_clause=f"{self.OIDField} = {idx}").__next__()
if isinstance(idx, str) and idx in self.fieldnames + self.cursor_tokens:
return [v for v in self.get_rows([idx])]
if isinstance(idx, list) and all([isinstance(f, str) for f in idx]):
if set(idx).issubset(self.fieldnames + self.cursor_tokens):
return [v for v in self.get_rows(idx, as_dict=True)]
raise KeyError(f"{idx} not in {self.fieldnames + self.cursor_tokens}")
if isinstance(idx, list) and all([isinstance(f, int) for f in idx]):
return [v for v in self.get_rows(where_clause=f"{self.OIDField} IN {tuple(idx)}", as_dict=True)]
raise ValueError(f"{idx} is invalid, either pass field, OID, or list of fields or lis of OIDs")
def __setitem__(self, idx: int | str | list, values: list | Any):
if isinstance(idx, int) and len(values) == len(self.fieldnames):
self.update_rows(self.OIDField, {idx: dict(zip(self.fieldnames, values))})
return
if isinstance(idx, str) and idx in self.fieldnames + self.cursor_tokens:
with arcpy.da.UpdateCursor(self.path, [idx], where_clause=self.query) as cursor:
for row in cursor:
row[0] = values
cursor.updateRow(row)
return
if isinstance(idx, list) and set(idx).issubset(set(self.fieldnames + self.cursor_tokens)):
with arcpy.da.UpdateCursor(self.path, idx, where_clause=self.query) as cursor:
for row in cursor:
cursor.updateRow(values)
return
raise ValueError(f"{idx} not in {self.fieldnames + self.cursor_tokens} or index is out of range")
def __delitem__(self, idx: int | str | list[str]):
if isinstance(idx, int):
self.delete_rows(self.OIDField, [idx])
return
if isinstance(idx, str) and idx in self.fieldnames:
self.delete_fields([idx])
self.update()
return
if isinstance(idx, list) and all(isinstance(field, str) for field in idx):
if all([field in self.fieldnames for field in idx]):
self.delete_fields(idx)
return
else:
raise ValueError(f"{idx} not subset of {self.fieldnames}")
if isinstance(idx, list) and all(isinstance(field, int) for field in idx):
self.delete_rows(self.OIDField, idx)
return
raise KeyError(f"{idx} not in {self.fieldnames + self.cursor_tokens}")
class FeatureClass(Table):
""" Wrapper for basic FeatureClass operations """
def __init__(self, shppath: os.PathLike):
super().__init__(shppath)
self.spatialReference: arcpy.SpatialReference = self.describe.spatialReference
self.shapeType: str = self.describe.shapeType
self.cursor_tokens.extend(
[
"SHAPE@XY",
"SHAPE@TRUECENTROID",
"SHAPE@X",
"SHAPE@Y",
"SHAPE@Z",
"SHAPE@M",
"SHAPE@JSON",
"SHAPE@WKB",
"SHAPE@WKT",
"SHAPE@",
"SHAPE@AREA",
"SHAPE@LENGTH",
]
)
class ShapeFile(FeatureClass):
""" Wraper for basic Shapefile operations"""
Here's another fun example I came up with while testing out using alternate OID fields as object indexes:
At the end of the day it's basically always going to be faster to write out all the boilerplate like you're supposed to, but being able to manipulate FeatureClasses as if they're python objects can sometimes be worth a small time hit. The Second example is actually functionally identical to the first, but there's a small time hit because I'm still pulling the full Shape object instead of using Shape@XY that is returned by the ['*'] wildcard in cursors.
The last one is definitely an interesting simplification (in terms of code readability and not having to worry about intermediate variables). I was able to build the fieldvalue sets in place by iterating over the field names then in that same loop immediately print out if that field was unique or not.
The last implementation is also the most process intensive as it creates len(fieldnames) Cursor objects and has to interface with all the error handling and data passthroughs implemented in the Table.get_rows() function, but then again the second example also has to deal with that so the time suck for 3 is for sure in the initialization and deletion of 20+ cursors for each field check.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.