Hello, The script below joins a table to a feature class and then copies specific features from there into a different, existing feature class. This has run smoothly for over a year, but recently there was a change in the database that the table comes from, and now the script fails at the append step. Nothing has changed on the gis-side - feature classes, locations, etc. We've been tinkering with this for months now and are not getting any closer to a solution.
The traceback we're getting has it specifically failing on the 'with arcpy.da.SearchCursor(...' line (Line 359 ). It doesn't make it to the for loop or .getMessages()
This has been tinkered with a lot since the last time it was successfully ruin. First we were told to not do a copy feature / paste feature (how it came out of Model Builder), but ratehr an insert. Then there was a question of whether or not an edit session was needed. Then we were suggested using an insert cursor... I built the original in Modelbuilder and exported to Python, which is about the extent of my skills. A colleague who is skilled in Python but new to GIS has been doing most of the work now. Any idea what might be going on? Really appreciate your help !
#-------------------------------------------------------------------------------
# Name: SurveyPlatAutomation
# Purpose: This script identifies surveys marked complete in Accela and grabs
# selected information for each from Accela. The parcel ID from
# Accela is used to select a parcel, copy its geometry to the Survey
# Plat layer in editgis, and populate the attribute table with the
# Accela field information.
#
# Author: mlauer@bouldercounty.gov
#
# Created: 01/21/2025
#
# Updated: 01/21/2026 - sent from Sean
# 01/28/2026 - updated connection string/query for new server
# 02/09/2026 - fixed sql query, tried arcpy Editor class
# 02/17/2026 - Editor crashed at start of run, trying InsertCursor
# 03/04/2026 - format dataframe to match feature class exactly
# 04/06/2026 - check for datum errors
# 04/16/2026 - strip csv table prefix from table before insert
#-------------------------------------------------------------------------------
from datetime import date
import os
import shutil
import sys
import time
import arcpy
import pandas as pd
import pyodbc
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
# new server
connection_string = (
REMOVED
)
query = """
SELECT *
FROM OPENQUERY([BOCO-ASYNC-PROD-DB.US.ERDB.ACCELA.COM,14332], '
SELECT V_RECORD.RECORD_ID AS survey_id
,V_RECORD.RECORD_TYPE AS docket_type
,V_RECORD.DATE_OPENED AS app_date
,V_RECORD.SHORT_NOTES AS document_title
,V_RECORD.[DESCRIPTION] AS description
,V_PARCEL.PARCEL_NBR AS parcel_id
,V_RECORD.ADDR_FULL_LINE# AS address
FROM V_RECORD
JOIN V_PARCEL ON V_RECORD.RECORD_ID = V_PARCEL.RECORD_ID
WHERE V_RECORD.DATE_OPENED >= ''2025-01-01''
AND V_RECORD.RECORD_TYPE = ''Land Survey Plat''
AND V_RECORD.[STATUS] = ''Accepted Completed''
')
"""
def main():
start_time = time.time()
today = date.today()
formatted_date = today.strftime('%m/%d/%Y')
# os.system('pause')
surveys = query_accela(
connection_string,
query
)
# os.system('pause')
split_surveys = return_by_parcel_id(surveys)
# get lists from returned dictionary
process_surveys = split_surveys['surveys']
missing_surveys = split_surveys['missing_id']
multi_surveys = split_surveys['multi_id']
# create list of features to upsert
feature_class_update = []
# lists of records containing nonetype values
surveys_with_null_values = []
# iterate through process_surveys
for survey in process_surveys:
# check for null values
if survey['survey_id'] is None:
surveys_with_null_values.append(survey)
elif survey['docket_type'] is None:
surveys_with_null_values.append(survey)
elif survey['app_date'] is None:
surveys_with_null_values.append(survey)
elif survey['document_title'] is None:
surveys_with_null_values.append(survey)
elif survey['description'] is None:
surveys_with_null_values.append(survey)
else:
# dictionary for each feature
feature_attributes = {}
# set survey_id
feature_attributes['survey_id'] = survey['survey_id']
# set parcel_id
feature_attributes['parcel_id'] = survey['parcel_id']
# set survey_type
if survey['document_title'].startswith('IMPROVEMENT'):
feature_attributes['survey_type'] = 'Improvement Survey Plat'
elif survey['document_title'].startswith('ALTA'):
feature_attributes['survey_type'] = 'Land Survey Plat'
elif survey['document_title'].startswith('LAND'):
feature_attributes['survey_type'] = 'Land Survey Plat'
else:
feature_attributes['survey_type'] = 'Other Type of Plat'
# set survey_descr
raw_description = survey['description']
# section
if 'SECTION ' in raw_description:
sec_abbr = raw_description.replace('SECTION ', 'SEC')
elif 'SEC. ' in raw_description:
sec_abbr = raw_description.replace('SEC. ', 'SEC')
elif 'SEC ' in raw_description:
sec_abbr = raw_description.replace('SEC ', 'SEC')
else:
sec_abbr = raw_description
pass
# abbreviate township
if 'TOWNSHIP ' in sec_abbr:
town_abbr = sec_abbr.replace('TOWNSHIP ', 'T')
else:
town_abbr = sec_abbr
pass
# abbreviate range
if 'RANGE ' in town_abbr:
range_abbr = town_abbr.replace('RANGE ', 'R')
else:
range_abbr = town_abbr
pass
# abbreviate quarter
if 'QUARTER' in range_abbr:
quarter_abbr = range_abbr.replace('QUARTER', '1/4')
else:
quarter_abbr = range_abbr
pass
# abbreviate northeast
if 'NORTHEAST' in quarter_abbr:
ne_abbr = quarter_abbr.replace('NORTHEAST', 'NE')
else:
ne_abbr = quarter_abbr
pass
# abbreviate southeast
if 'SOUTHEAST' in ne_abbr:
se_abbr = ne_abbr.replace('SOUTHEAST', 'SE')
else:
se_abbr = ne_abbr
pass
# abbreviate southwest
if 'SOUTHWEST' in se_abbr:
sw_abbr = se_abbr.replace('SOUTHWEST', 'SW')
else:
sw_abbr = se_abbr
pass
# abbreviate northwest
if 'NORTHWEST' in sw_abbr:
nw_abbr = sw_abbr.replace('NORTHWEST', 'NW')
else:
nw_abbr = sw_abbr
pass
# abbreviate north
if 'NORTH' in nw_abbr:
n_abbr = nw_abbr.replace('NORTH', 'N')
else:
n_abbr = nw_abbr
pass
# abbreviate east
if 'EAST' in n_abbr:
e_abbr = n_abbr.replace('EAST', 'E')
else:
e_abbr = n_abbr
pass
# abbreviate south
if 'SOUTH' in e_abbr:
s_abbr = e_abbr.replace('SOUTH', 'S')
else:
s_abbr = e_abbr
pass
# abbreviate west
if 'WEST' in s_abbr:
w_abbr = s_abbr.replace('WEST', 'W')
else:
w_abbr = s_abbr
pass
# concat document_title and abbr_description
abbr_description = w_abbr
final_description = survey['document_title'] + ', ' + abbr_description
# check for 255 chars
if len(final_description) <= 254:
feature_attributes['survey_descr'] = final_description
else:
feature_attributes['survey_descr'] = final_description[:254]
# set survey_year
if survey['app_date'] is None:
feature_attributes['survey_year'] = '0000' # check with Sean...
else:
feature_attributes['survey_year'] = str(survey['app_date'].year)
# set PLSS_COORD, it will always equal 'N'
feature_attributes['PLSS_COORD'] = 'N'
# set survey_link
path = r'V:\ScannedMaps\SURVEY\LANDSURVEY''\\'
year_last_2 = feature_attributes['survey_year'][-2:]
survey_id_last_4 = feature_attributes['survey_id'][-4:]
suffix = '-01'
file_ext = '.pdf'
filename = year_last_2 + survey_id_last_4 + suffix + file_ext
feature_attributes['link'] = path + filename
# add to feature_class_update
feature_class_update.append(feature_attributes)
# accela data as a dataframe
df = pd.DataFrame(feature_class_update)
clean_df = rename_reorder_df(
df,
rename_map={
'survey_id': 'SURVEY_ID',
'survey_type': 'SURVEY_TYP',
'survey_descr': 'SURVEY_DES',
'survey_year': 'SURVEY_YR',
'PLSS_COORD': 'PLSS_COORD',
'link': 'LINK',
'parcel_id': 'PARCEL_NO'
},
column_order=[
'SURVEY_ID', 'SURVEY_TYP', 'SURVEY_DES', 'SURVEY_YR', 'PLSS_COORD',
'LINK', 'PARCEL_NO'
]
)
# os.system('pause')
# the gis
print('GIS functions:')
# environment settings
arcpy.env.overwriteOutput = True
# cleanup from previous session
print('Deleting old SurveysAdded...')
surveys_added = r'V:\\prjlu\\SurveyPlats\\PlatProcessingAutomation\\SurveyProcessing.gdb\\SurveysAdded'
arcpy.management.Delete(
in_data=[surveys_added]
)[0]
print('Saving Accela df as csv...')
# saving df as csv
accela_table = r'V:\\prjlu\\SurveyPlats\\PlatProcessingAutomation\\TableFromAccela.csv'
clean_df.to_csv(accela_table, index=False)
# csv to table
csv_table = 'csv_table'
workspace = r'V:\\prjlu\\SurveyPlats\\PlatProcessingAutomation\\SurveyProcessing.gdb'
table_output = arcpy.conversion.TableToTable(
accela_table,
workspace,
csv_table
)
# create table view
print('Creating table view...')
table_view = 'csv_table_view'
arcpy.management.MakeTableView(
table_output,
table_view
)
# temporary feature layer from feature class
print('Creating temporary layer from Parcels feature class...')
parcels = r'V:\\gislu\\_DatabaseConnection\\regis1.boco.gis.dc.sde\\boco.PARCELS.PARCELS'
parcels_layer = 'parcels_layer'
arcpy.management.MakeFeatureLayer(parcels, parcels_layer)
# join
print('Joining csv table view to Parcels feature class...')
arcpy.management.AddJoin(
in_layer_or_view=parcels_layer,
in_field='PARCEL_NO',
join_table=table_view,
join_field='PARCEL_NO',
join_type='KEEP_COMMON'
)
# export features
print('Exporting table to SurveysAdded feature class...')
surveys_added = r'V:\\prjlu\\SurveyPlats\\PlatProcessingAutomation\\SurveyProcessing.gdb\\SurveysAdded'
arcpy.management.CopyFeatures(
parcels_layer,
surveys_added
)
# remove join
print('Removing temporary join...')
arcpy.management.RemoveJoin(parcels_layer)
# strip 'csv_table_' prefix from surveys_added fieldnames
print('Stripping csv_table_ prefixes...')
strip_prefix_fields(surveys_added, prefix='csv_table_', collision_suffix='_ACC')
# print record count of surveys_added
input_record_count = arcpy.management.GetCount(surveys_added)
print(f'Input table surveys_added has {input_record_count} records')
# target feature class
survey_plat = r'V:\\gislu\\_DatabaseConnection\\editgis1.lu.user.dc.sde\\lu.LU.SURV_SurveyPlat'
print('Schema lock test (target):', arcpy.TestSchemaLock(survey_plat))
print('Workspace type:', getattr(arcpy.Describe(os.path.dirname(survey_plat)), 'workspaceType', None))
# check for branched versioning
desc_fc = arcpy.Describe(survey_plat)
print('isVersioned:', getattr(desc_fc, 'isVersioned', None))
print('isBranchVersioned:', getattr(desc_fc, 'isBranchVersioned', None))
#---------------------------------------------------------------------------
fields = [
'SURVEY_ID', 'SURVEY_TYP', 'SURVEY_DES', 'SURVEY_YR',
'PLSS_COORD', 'LINK', 'PARCEL_NO'
]
# Build explicit source and target field lists including SHAPE@
src_fields = fields + ['SHAPE@']
tgt_fields = fields + ['SHAPE@']
sde_workspace = r'V:\gislu\_DatabaseConnection\editgis1.lu.user.dc.sde'
edit = arcpy.da.Editor(sde_workspace)
# Inspect target field types
tgt_field_types = {f.name: f.type for f in arcpy.ListFields(survey_plat)}
print('Target field types:', tgt_field_types)
src_field_types = {f.name: f.type for f in arcpy.ListFields(surveys_added)}
print('Source field types:', src_field_types)
try:
# open edit session
edit.startEditing(True, True)
edit.startOperation()
inserted = 0
skipped = 0
with arcpy.da.SearchCursor(surveys_added, src_fields) as s_cur, \
arcpy.da.InsertCursor(survey_plat, tgt_fields) as i_cur:
for row in s_cur:
try:
i_cur.insertRow(row)
inserted += 1
except Exception as row_error:
print(arcpy.getMessages())
print(f'ERROR - Could not insert row: {row_error}')
skipped += 1
print(f'Inserted: {inserted} | Skipped: {skipped}')
edit.stopOperation()
edit.stopEditing(True)
except Exception as e:
# roll back safely on error
if edit.isEditing:
edit.stopOperation()
edit.stopEditing(False)
raise
#---------------------------------------------------------------------------
# formatted email body
# prep str for processed surveys
report_processed = f'Surveys processed: {len(process_surveys)}\n'
for survey in process_surveys:
survey_id = survey['survey_id']
parcel_id = survey['parcel_id']
report_processed += f'{survey_id}' + ' ' + f'{parcel_id}\n'
# prep str for surveys missing a parcel id
report_missing = f'Surveys missing a parcel id: {len(missing_surveys)}\n'
for survey in missing_surveys:
survey_id = survey['survey_id']
report_missing += f'{survey_id}\n'
# prep str for surveys with multiple parcel ids
report_multi = f'Surveys with multiple parcel ids: {len(multi_surveys)}\n'
for survey in multi_surveys:
survey_id = survey['survey_id']
parcel_id = survey['parcel_id']
report_multi += f'{survey_id}' + ' ' + f'{parcel_id}\n'
# prep str for surveys with nonetype values
report_null = f'Surveys with NULL values: {len(surveys_with_null_values)}\n'
for survey in surveys_with_null_values:
if survey['survey_id'] is None:
survey_id = 'NULL'
else:
survey_id = survey['survey_id']
parcel_id = survey['parcel_id']
report_null += f'{survey_id}' + ' ' + f'{parcel_id}\n'
# assembly of email body
email_body = 'Please review following land survey plats:\n' + '\n'
email_body += f'{report_processed}\n'
email_body += f'{report_missing}\n'
email_body += f'{report_multi}\n'
email_body += f'{report_null}'
# send email function used for testing
send_gmail(
recipient='mlauer@bouldercounty.gov',
subject='Survey Plat Report - ' + formatted_date,
body=email_body
)
send_gmail(
recipient='sgambrel@bouldercounty.gov',
subject='Survey Plat Report - ' + formatted_date,
body=email_body
)
end_time = time.time()
run_time = end_time - start_time
print(f'Runtime: {run_time:.2f} seconds')
################################################################################
## END MAIN
################################################################################
def strip_prefix_fields(feature_class, prefix='csv_table_', collision_suffix='_ACC'):
"""
Rename fields in `feature_class` that start with `prefix` by stripping the prefix.
If the target name already exists, append `collision_suffix` with an incrementing number.
"""
# Build a set of existing field names (lowercase for quick membership checks)
existing = {f.name.lower() for f in arcpy.ListFields(feature_class)}
# Iterate all fields and rename those with the prefix
for f in arcpy.ListFields(feature_class):
old_name = f.name
if old_name.startswith(prefix):
base = old_name[len(prefix):] # target name without prefix
new_name = base
# Handle collisions: ensure unique name
if new_name.lower() in existing:
i = 1
while f"{base}{collision_suffix}{i}".lower() in existing:
i += 1
new_name = f"{base}{collision_suffix}{i}"
# Use the same alias as the new name (or customize here)
new_alias = new_name
# Perform the rename
arcpy.management.AlterField(
in_table=feature_class,
field=old_name,
new_field_name=new_name,
new_field_alias=new_alias
)
# Maintain the set to reflect the rename
existing.add(new_name.lower())
existing.discard(old_name.lower())
print(f"Renamed field: {old_name} -> {new_name}")
def rename_reorder_df(df, rename_map=None, column_order=None):
result = df.copy()
if rename_map:
result = result.rename(columns=rename_map)
if column_order:
result = result[column_order]
return result
def append_df_to_feature_class(df, feature_class, field_map=None):
if field_map:
df = df.rename(columns=field_map)
fc_fields = [
f.name for f in arcpy.ListFields(feature_class)
if f.type not in ('OID', 'Geometry') and not f.required
]
matched_fields = [col for col in df.columns if col in fc_fields]
if not matched_fields:
raise ValueError('No matching fields found between df and fc')
rows_inserted = 0
with arcpy.da.InsertCursor(feature_class, matched_fields) as cursor:
for row in df[matched_fields].itertuples(index=False, name=None):
cursor.insertRow(row)
rows_inserted += 1
print(f'Rows inserted: {rows_inserted}')
# InsertCursor -----------------------------------------------------------------
def resolve_joined_field(source_fc, target_name):
"""
Find the actual field name in source_fc that matches target_name,
allowing for join prefixes like `csv_table_` or `parcels_`.
"""
source_names = [f.name for f in arcpy.ListFields(source_fc)]
# exact match
if target_name in source_names:
return target_name
# look for suffix match like xxx_TARGETNAME
candidates = [
name for name in source_names
if name.lower().endswith(target_name.lower())
]
if len(candidates) == 1:
return candidates[0]
elif len(candidates) > 1:
# choose the shortest suffix match to avoid over-prefixed names
return sorted(candidates, key=len)[0]
else:
raise RuntimeError(f'Could not resolve source field for: {target_name}')
# End InsertCursor functions ---------------------------------------------------
def query_accela(connection_string, query):
try:
connection = pyodbc.connect(connection_string)
cursor = connection.cursor()
cursor.execute(query)
# get key names
keys = [key[0] for key in cursor.description]
# create list of dictionaries
results = [
dict(zip(keys, row)) for row in cursor
]
cursor.close()
connection.close()
return results
except pyodbc.Error as e:
print(f'Database error: {e}')
return []
except Exception as e:
print(f'Error: {e}')
return []
def return_by_parcel_id(query_returns):
surveys_with_parcel_id = []
surveys_multi_parcel_id = []
surveys_missing_parcel_id = []
if len(query_returns) > 0:
for survey in query_returns:
if survey['parcel_id'].startswith('0'):
surveys_missing_parcel_id.append(survey)
elif len(survey['parcel_id']) > 12:
surveys_multi_parcel_id.append(survey)
else:
surveys_with_parcel_id.append(survey)
else:
return False
return {
'surveys': surveys_with_parcel_id,
'multi_id': surveys_multi_parcel_id,
'missing_id': surveys_missing_parcel_id
}
def send_gmail(recipient, subject, body):
gmail_sender = REMOVED
gmail_app = REMOVED
try:
msg = MIMEMultipart()
# create email
msg['From'] = gmail_sender
msg['To'] = recipient
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
# connect to gmail server and send
server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
server.ehlo()
server.login(gmail_sender, gmail_app)
text = msg.as_string()
server.sendmail(gmail_sender, recipient, text)
server.close()
print(f'Email sent to: {recipient}')
except Exception as e:
print(f'Error: {e}')
################################################################################
## END OF ALL FUNCTIONS
################################################################################
if __name__ == '__main__':
main()
line number? for the append step
a copy of the error message, if there was one might be useful
Hi Dan,
Sorry, we're no longer using the append (there has been A LOT of trial and error here), and I've updated the post. Script runs until around line 359.
That's kinda a weird way to handle cursor contexts. I'd wrap both in parens instead of using a \ line continuation.
Line continuations are always best avoided since a sneaky whitespace character after the \ will break them and that's almost impossible to find.
Also, the get messages function for arcpy is arcpy.GetMessages() not arcpy.getMessages()
You also never delete your edit session, you should always run an edit session using a with block since a crash out could leave that session active and cause issues down the line since the locks aren't properly cleaned up.
Here's that surrounding code with those edits:
# Build explicit source and target field lists including SHAPE@
src_fields = fields + ['SHAPE@']
tgt_fields = fields + ['SHAPE@']
sde_workspace = r'V:\gislu\_DatabaseConnection\editgis1.lu.user.dc.sde'
# Inspect target field types
tgt_field_types = {f.name: f.type for f in arcpy.ListFields(survey_plat)}
print('Target field types:', tgt_field_types)
src_field_types = {f.name: f.type for f in arcpy.ListFields(surveys_added)}
print('Source field types:', src_field_types)
inserted = 0
skipped = 0
with (
arcpy.da.Editor(sde_workspace) as edit,
arcpy.da.SearchCursor(surveys_added, src_fields) as s_cur,
arcpy.da.InsertCursor(survey_plat, tgt_fields) as i_cur,
):
edit.startEditing(True, True)
edit.startOperation()
for row in s_cur:
try:
i_cur.insertRow(row)
inserted += 1
except Exception as row_error:
print(arcpy.GetMessages())
print(f'ERROR - Could not insert row: {row_error}')
skipped += 1
print(f'Inserted: {inserted} | Skipped: {skipped}')
edit.stopOperation()
edit.stopEditing(True)
A few things,
Add this before the cursor block:
# Debug - check actual field names actual_fields = [f.name for f in arcpy.ListFields(surveys_added)] print("Actual fields in surveys_added:", actual_fields) print("Fields we're trying to read:", src_fields)
The join table prefix might be different now (e.g., csv_table_ vs TableFromAccela_ vs something else).
Try this simpler approach - skip the rename and direct field mapping:
# After creating surveys_added, map fields explicitly: field_mapping = { 'csv_table_SURVEY_ID': 'SURVEY_ID', 'csv_table_SURVEY_TYP': 'SURVEY_TYP', # ... etc } # Use the mapped names in your cursor src_fields = list(field_mapping.keys()) + ['SHAPE@'] tgt_fields = list(field_mapping.values()) + ['SHAPE@']
Move the cursor OUTSIDE the edit session:
# FIRST: test reading source
with arcpy.da.SearchCursor(surveys_added, src_fields) as s_cur:
rows = list(s_cur)
# THEN open edit session
edit.startEditing(True, True)
edit.startOperation()
with arcpy.da.InsertCursor(survey_plat, tgt_fields) as i_cur:
for row in rows:
i_cur.insertRow(row)You say there was a change in the database that broke it.
What was the change?
R_
I took a shot at cleaning up the logic a bit. There's a ton of joining and pivoting you don't really need to do since you're just using an InsertCursor at the end of the processing. Definitely just use this as a reference to validate your logic since the overall structure is totally different, but sometimes having a different implementation can help you find things:
#-------------------------------------------------------------------------------
# Name: SurveyPlatAutomation
# Purpose: This script identifies surveys marked complete in Accela and grabs
# selected information for each from Accela. The parcel ID from
# Accela is used to select a parcel, copy its geometry to the Survey
# Plat layer in editgis, and populate the attribute table with the
# Accela field information.
#
# Author: mlauer@bouldercounty.gov
#
# Created: 01/21/2025
#
# Updated: 01/21/2026 - sent from Sean
# 01/28/2026 - updated connection string/query for new server
# 02/09/2026 - fixed sql query, tried arcpy Editor class
# 02/17/2026 - Editor crashed at start of run, trying InsertCursor
# 03/04/2026 - format dataframe to match feature class exactly
# 04/06/2026 - check for datum errors
# 04/16/2026 - strip csv table prefix from table before insert
# 04/21/2026 - rewrite with some example blocks (NOT READY FOR PRODUCTION)
#-------------------------------------------------------------------------------
# Set this to True once you've reviewed the code and want to test it
ALLOW_RUN = False
from datetime import date, datetime
from pathlib import Path
import time
from typing import Literal, TypedDict
import arcpy
import pyodbc
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
#----------------------------------------------------------------------------#
# Globals #
#----------------------------------------------------------------------------#
NOTIFICATIONS = [
'mlauer@bouldercounty.gov',
'sgambrel@bouldercounty.gov'
]
PLAT_PROCESSING_DIR = Path(r'V:\prjlu\SurveyPlats\PlatProcessingAutomation')
SDE_DIRECTORY = Path(r'V:\gislu\_DatabaseConnection')
PARCEL_DATABASE = SDE_DIRECTORY / 'regis1.boco.gis.dc.sde'
SURVEY_PLAT_DATABASE = SDE_DIRECTORY / 'editgis1.lu.user.dc.sde'
MAP_DIRECTORY = Path(r'V:\ScannedMaps\SURVEY\LANDSURVEY')
CONNECTION_STRING = (
'<REMOVED>'
)
ACCELA_QUERY = (
"""
SELECT *
FROM OPENQUERY([BOCO-ASYNC-PROD-DB.US.ERDB.ACCELA.COM,14332], '
SELECT V_RECORD.RECORD_ID AS survey_id
,V_RECORD.RECORD_TYPE AS docket_type
,V_RECORD.DATE_OPENED AS app_date
,V_RECORD.SHORT_NOTES AS document_title
,V_RECORD.[DESCRIPTION] AS description
,V_PARCEL.PARCEL_NBR AS parcel_id
,V_RECORD.ADDR_FULL_LINE# AS address
FROM V_RECORD
JOIN V_PARCEL ON V_RECORD.RECORD_ID = V_PARCEL.RECORD_ID
WHERE V_RECORD.DATE_OPENED >= ''2025-01-01''
AND V_RECORD.RECORD_TYPE = ''Land Survey Plat''
AND V_RECORD.[STATUS] = ''Accepted Completed''
')
"""
)
DEFAULT_TITLE = '_NO_TITLE_'
DEFAULT_YEAR = '0000'
DEFAULT_DESCRIPTION = '_NO_DESCRIPTION_'
DEFAULT_SURVEY_LAST_4 = 'xyzx'
class SurveyRecord(TypedDict):
"""Schema from Accela"""
survey_id: str | None
docket_type: str | None
app_date: datetime | None
document_title: str | None
description: str | None
parcel_id: str | None
address: str | None
class NewSurveyRecord(TypedDict):
"""Target schema for added surveys"""
SURVEY_ID: str | None
SURVEY_TYP: str | None
SURVEY_DES: str | None
SURVEY_YR: str | None
PLSS_COORD: Literal['N'] | None
LINK: str | None
PARCEL_NO: str | None
REQUIRED_FIELDS = list(NewSurveyRecord.__required_keys__)
def main():
start_time = time.time()
today = date.today()
formatted_date = today.strftime('%m/%d/%Y')
surveys = query_accela(CONNECTION_STRING, ACCELA_QUERY)
if not surveys:
print('No Surveys Found!')
end_time = time.time()
run_time = end_time - start_time
print(f'Runtime: {run_time:.2f} seconds')
return
split_surveys = return_by_parcel_id(surveys)
process_surveys = split_surveys['surveys']
missing_surveys = split_surveys['missing_id']
multi_surveys = split_surveys['multi_id']
existing_records: dict[str, arcpy.Polygon] = {
parcel_no: shape
for parcel_no, shape
in arcpy.da.SearchCursor(
str(PARCEL_DATABASE / 'boco.PARCELS.PARCELS'),
['PARCEL_NO', 'SHAPE@']
)
}
new_surveys = [
process_accela_survey(survey)
for survey in process_surveys
if has_required(survey)
and survey['parcel_id'] in existing_records
]
null_surveys: list[SurveyRecord] = [
survey for survey in process_surveys
if not has_required(survey)
or survey['parcel_id'] not in existing_records # Seperate group?
]
input_record_count = len(new_surveys)
print(f'Input table surveys_added has {input_record_count} records')
# target feature class
survey_plat = SURVEY_PLAT_DATABASE / 'lu.LU.SURV_SurveyPlat'
_waited = 0
while not arcpy.TestSchemaLock(str(survey_plat)):
if _waited > 60:
# Timeout after a minute
raise TimeoutError(f'Unable to obtain schema lock for {survey_plat.name} within 60 seconds')
print(f'Wating for schema lock ({_waited}s/60s)...')
time.sleep(1)
_waited += 1
with (
arcpy.da.Editor(str(SURVEY_PLAT_DATABASE)),
arcpy.da.InsertCursor(str(survey_plat), ['SHAPE@'] + REQUIRED_FIELDS) as cur
):
fields = cur.fields
for new_survey in new_surveys:
if not new_survey['PARCEL_NO']: # This should never happen
print(f'Skipping Processed Survey Record with no associated parcel number: {new_survey}')
continue
shape = existing_records[new_survey['PARCEL_NO']]
cur.insertRow([shape] + [new_survey[field] for field in fields])
email_body = build_email_body(process_surveys, missing_surveys, multi_surveys, null_surveys)
for recipient in NOTIFICATIONS:
send_gmail(
recipient=recipient,
subject=f'Survey Plat Report - {formatted_date}',
body=email_body)
end_time = time.time()
run_time = end_time - start_time
print(f'Runtime: {run_time:.2f} seconds')
#---------------------------------------------------------------------------
def build_email_body(
process_surveys: list[SurveyRecord],
missing_surveys: list[SurveyRecord],
multi_surveys: list[SurveyRecord],
null_surveys: list[SurveyRecord],
) -> str:
# NOTE: Should probably add a body component to show total number of rows inserted
#rows_added = len(new_surveys)
# formatted email body
# prep str for processed surveys
report_processed = (
f'Surveys processed: {len(process_surveys)}\n',
f'{get_info(process_surveys, ['survey_id', 'parcel_id'])}'
)
# prep str for surveys missing a parcel id
report_missing = (
f'Surveys missing a parcel id: {len(missing_surveys)}\n'
f'{get_info(missing_surveys, ['survey_id'])}'
)
# prep str for surveys with multiple parcel ids
report_multi = (
f'Surveys with multiple parcel ids: {len(multi_surveys)}\n'
f'{get_info(multi_surveys, ['survey_id', 'parcel_id'])}'
)
# prep str for surveys with nonetype values
report_null = (
f'Surveys with NULL values: {len(null_surveys)}\n'
f'{get_info(null_surveys, ['survey_id', 'parcel_id'])}'
)
# assembly of email body
email_body = 'Please review following land survey plats:\n\n'
email_body += f'{report_processed}\n'
email_body += f'{report_missing}\n'
email_body += f'{report_multi}\n'
email_body += f'{report_null}'
return email_body
def get_info(accela_surveys: list[SurveyRecord], fields: list[str]) -> str:
return '\n'.join(' '.join(str(s.get(f) or 'NULL') for f in fields) for s in accela_surveys)
def has_required(accela_survey: SurveyRecord, required: list[str] | Literal['all', 'none'] = 'all') -> bool:
if required == 'none':
return True
elif required == 'all':
return all(accela_survey.values())
else:
return all(accela_survey.get(field) for field in required)
def process_accela_survey(accela_survey: SurveyRecord) -> NewSurveyRecord:
new_survey: NewSurveyRecord = {
'SURVEY_ID': None,
'SURVEY_TYP': None,
'SURVEY_DES': None,
'SURVEY_YR': None,
'PLSS_COORD': None,
'LINK': None,
'PARCEL_NO': None,
}
new_survey['SURVEY_ID'] = accela_survey['survey_id']
new_survey['PARCEL_NO'] = accela_survey['parcel_id']
type_map = {
'IMPR': 'Improvement Survey Plat',
'ALTA': 'Land Survey Plat',
'LAND': 'Land Survey Plat',
}
new_survey['SURVEY_TYP'] = type_map.get(
# Get the document title survey_type if it exists
(accela_survey['document_title'] or DEFAULT_TITLE)[:4],
# or Default survey_type
'Other Type of Plat'
)
survey_descr = (accela_survey['description'] or '_NO_DESCRIPTION_')
descr_map = {
'SECTION ': 'SEC',
'SEC. ': 'SEC',
'SEC ': 'SEC',
'TOWNSHIP': 'T',
'RANGE': 'R',
'QUARTER': '1/4',
# NOTE: NORTHEAST etc. will be replaced as NORTH -> N EAST -> E
'NORTH': 'N',
'SOUTH': 'S',
'EAST': 'E',
'WEST': 'W'
}
for original, abbreviation in descr_map.items():
if original in survey_descr:
survey_descr = survey_descr.replace(original, abbreviation)
final_description = (accela_survey['document_title'] or DEFAULT_TITLE) + ', ' + survey_descr
new_survey['SURVEY_DES'] = final_description[:254]
new_survey['SURVEY_YR'] = str(accela_survey['app_date'].year) if accela_survey['app_date'] else DEFAULT_YEAR
new_survey['PLSS_COORD'] = 'N'
year_last_2 = new_survey['SURVEY_YR'][-2:]
survey_id_last_4 = accela_survey['survey_id'][-4:] if accela_survey['survey_id'] else DEFAULT_SURVEY_LAST_4
suffix = '-01'
file_ext = '.pdf'
filename = year_last_2 + survey_id_last_4 + suffix + file_ext
new_survey['LINK'] = str(MAP_DIRECTORY / filename)
return new_survey
def query_accela(connection_string: str, query: str) -> list[SurveyRecord]:
try:
with pyodbc.connect(connection_string).cursor() as cursor:
cursor.execute(query)
keys = [k[0] for k in cursor.description]
return [dict(zip(keys, row)) for row in cursor] # type: ignore
except pyodbc.Error as e:
print(f'Database error: {e}')
except Exception as e:
print(f'Error: {e}')
return []
_ReturnKeys = Literal['surveys', 'multi_id', 'missing_id']
def return_by_parcel_id(query_returns: list[SurveyRecord]) -> dict[_ReturnKeys, list[SurveyRecord]]:
surveys_with_parcel_id: list[SurveyRecord] = []
surveys_multi_parcel_id: list[SurveyRecord] = []
surveys_missing_parcel_id: list[SurveyRecord] = []
for survey in query_returns:
if survey['parcel_id'] and survey['parcel_id'].startswith('0'):
surveys_missing_parcel_id.append(survey)
elif survey['parcel_id'] and len(survey['parcel_id']) > 12:
surveys_multi_parcel_id.append(survey)
else:
surveys_with_parcel_id.append(survey)
return {
'surveys': surveys_with_parcel_id,
'multi_id': surveys_multi_parcel_id,
'missing_id': surveys_missing_parcel_id
}
def send_gmail(recipient: str, subject: str, body: str):
gmail_sender = '<REMOVED>'
gmail_app = '<REMOVED>'
try:
msg = MIMEMultipart()
# create email
msg['From'] = gmail_sender
msg['To'] = recipient
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
# connect to gmail server and send
server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
server.ehlo()
server.login(gmail_sender, gmail_app)
text = msg.as_string()
server.sendmail(gmail_sender, recipient, text)
server.close()
print(f'Email sent to: {recipient}')
except Exception as e:
print(f'Error: {e}')
if __name__ == '__main__':
if ALLOW_RUN:
main()