Select to view content in your preferred language

Best practices for programmatically mirroring complete weekly CSV drops (inserts, updates, deletes) to a registered Enterprise GDB table?

255
5
Friday
TylerT
by
Frequent Contributor

Hi everyone,

I’m looking for some data flow advice on the most efficient and robust way to update registered Enterprise Geodatabase tables (hosted in SQL Server) from a weekly CSV data drop.

The Context:
Each week, I receive a large data drop (80+ CSVs). These CSVs represent the complete and current state of the data. Because it's a full state drop, the synchronization process needs to handle inserts for new records, updates for changed records, and importantly, deletes for records that exist in the database but are no longer present in the incoming CSV. My goal is to mirror this data into non-spatial tables that are registered with our Enterprise Geodatabase.  There will be no intention or need to update these tables other than during the weekly mirror process.

What We’ve Tried:
We initially built a custom, high-performance ETL pipeline using Python. Our workflow looked like this:

  1. Used DuckDB to extract and filter the CSVs directly from disk into memory-efficient chunks.

  2. Formatted the data into standard Python tuples.

  3. Used pyodbc to load the data into a temporary #Staging table in SQL Server.

  4. Executed a raw T-SQL MERGE statement to upsert the data and delete missing rows, explicitly ignoring Esri-managed fields like OBJECTID and GlobalID so we wouldn't overwrite them.

The Problem:
After testing, it became clear that performing raw SQL operations into an Esri-registered table is problematic. Bypassing the ArcGIS application tier to perform these upserts and deletes seems to not play nice with underlying Geodatabase mechanics (presumably related to how SDE manages IDs, indexes, versioning, or archiving). It seems that ESRI-managed tables are best managed by Esri tools to account for all these intricacies.

Potential Approaches:
If I have a CSV representing the absolute current state of a table, how best can I mirror that to a registered enterprise geodatabase table programmatically? I've been weighing a few different paths:

  • ArcGIS API for Python (Portal REST Endpoints): Bypassing the direct database connection entirely and using the arcgis library to interact with the published table's REST endpoint. Calculating the deltas in memory and passing them as adds, updates, and deletes via edit_features(). Does this application-tier approach perform well at scale compared to local arcpy operations?

  • Truncate and Load: Would truncating the table and simply appending the new CSV be the safest way to inherently handle the deletes? My concern here is whether this breaks existing relationship classes, web map pop-ups, or GlobalID dependencies downstream since the underlying IDs would constantly regenerate.

  • arcpy.management.Append with the upsert parameter: This seems great for the inserts and updates, but it doesn't natively handle deleting the records that have dropped off the source CSV.

  • arcpy.da Cursors: Relying on an UpdateCursor/InsertCursor script comparing dictionaries to calculate the delta and process the inserts, updates, and deletes row-by-row. Is this too slow for hundreds of thousands of rows?

Has anyone built a reliable, programmatically scheduled pipeline for this exact scenario? Any advice on which Esri tools, APIs, or Python libraries yield the most stable results for a true mirror (handling deletes safely) would be hugely appreciated!

Thanks in advance!

5 Replies
VinceAngelo
Esri Esteemed Contributor

I've done this several times now, once with hundreds of thousands of "changed" records twice daily (where only 10% are actually new), and once with 28 million records in each delivery (full dataset).

What I settled on was storing SHA-1 hashes of the data rows in a parallel table in the database.  The loader walks the tables, loading all the digest records first, then hashing the input stream with the same algorithm.  If the key column(s) are not present in the dictionary of hashes, it's an INSERT; if the key exists, and the SHA-1 the same, that's a No-Op, and I delete the key from the dictionary; if the key exists, but the SHA-1 is different, I tag it as an update, then also delete the dictionary key. Once I've processed all rows, any keys that are left are my delete candidates. 

Initially, I just walked the UPDATE and DELETE lists, but that sometimes made for hundreds of thousands of UpdateCursor calls, and running a full scan on 20+ million rows to change 200K wasn't cost-effective.  So I tweaked the initial solution to write records to a "joiner" table, with the key and 'U' or 'D', then ran one UpdateCursor on the table, with a "WHERE keycol in (SELECT keycol FROM joiner_tab)" query.  Then I could join the joiner to the primary table and the digest table to manifest the changes in two DA cursor calls (one for each table).

Using this technique I preserve the map service integrity during modifications (one COMMIT per table), without the outage caused by a TRUNCATE/re-load.  Yeah, there might be occasional digest collisions that produce a false positive, but with similar data, collisions are exceedingly rare. This methodology is also highly reliable -- One of these systems has been running with zero maintenance (beyond database and ArcGIS Pro upgrades) for over five years.

In theory, the API for Python could do this through the Portal, but in practice, using an UpdateCursor in ArcPy is literally a thousand times faster (680ms v. 12 minutes for 39K rows) than having to process each row as a series of XML web requests through the Portal and on to the AGS host, then finally to the database.

I did this all with plain vanilla ArcPy. The only tricky part is dealing with values coming in as database rows and as ASCII text, since type preservation can be a challenge.  

I've also created parallel tables to insert all rows into, letting the database manage the conditional updates as a trigger, but that's better suited to a fixed number of records (states, countries, counties, zip codes,...).

Good luck!

- V

TylerT
by
Frequent Contributor

HI @VinceAngelo

Thank you so much for the detailed response! 

Before I completely refactor my architecture around this, I had a few quick clarifying questions on how you typically implement this:

  1. The "Joiner Table" Implementation: When you write the update/delete keys to the joiner_tab, are you creating/truncating a standard Geodatabase table, or are you using a native RDBMS temporary table (e.g., #Joiner in SQL Server)? If it's a native temp table, does the arcpy.da.UpdateCursor's SQL expression cleanly support subqueries against it within the same session?

  2. Handling Inserts: You mentioned using the UpdateCursor to process the updates and deletes. For the new records (keys not found in the dictionary), do you typically iterate through an arcpy.da.InsertCursor, or do you dump the new records into a staging table and use arcpy.management.Append for better bulk performance?

  3. Hash Consistency and NULLs: Since you warned about type preservation between ASCII text and database rows, do you have a preferred pattern for handling NULL values versus empty strings ("") when concatenating fields for the SHA-1 hash? I want to ensure I avoid false-positive updates caused by NULL/empty string mismatches.

Thanks again for pointing me in the right direction!

Tyler

0 Kudos
VinceAngelo
Esri Esteemed Contributor

I've been using PostgreSQL for so long I've forgotten how to spell SQL Server. My joiner tables are regular tables with the same base name as the primary table (business table "source.table_name_t1", digest table "source.table_name_h1", joiner table "source.table_name_j1"). They all share the primary key column(s), but t1 has the row values, h1 has the row digest, and j1 has the  u_or_d column. I name the oid columns differently so the joined result is unambiguous.

INSERTs are executed immediately by an arcpy.da.InsertCursor.  There really isn't likely to be much performance gain by writing to an extra table, then appending from that, though I suppose it depends on the number of indexes in the primary table.  I have a further complication of needing to do this to a replica parent geodatabase, and using "INSERT INTO ... SELECT ..." with versioning would add unnecessary complexity.  Performance-wise, the query from a source DBMS is the long pole in my tent, and SHA-1 hashing is somewhat expensive, especially when less than 1% of each table is modified, so there's large input I/O and compute costs, actually writing new rows is somewhat rare.

Generally speaking, I don't permit empty strings to kludge around NOT NULL constraints, but I'm not always able to enforce that. For purposes of hashing, I'd recommend treating an empty field in a CSV as equivalent to a NULL. I have a funky table or two where I have to backtrack to enforce NOT NULL on strings by replacing None with ''  before INSERT, but doing it that way makes me feel dirty.

FWIW, I make extensive use of function dictionaries, so my encoder functions are placed in a dictionary by target datatype, and then they can massage oddities without harming the data stream. The code below handles empty string and NULL differently, (a single 0 byte vs a single ETX control character byte)

def pack_str(v,n):
    if (v == None): return(b'\x00')
    ba = bytearray()
    if (type(v) == datetime.datetime):
        if VERBOSE_ENABLED: 
            print("     {:8d}: Expected str, got {:s}!".format(n,str(type)))
        v = v.strftime(isoFmt)
    elif (type(v) != str):
        if VERBOSE_ENABLED: 
            print("     {:8d}: Expected str, got {:s}!".format(n,str(type)))
        v = str(v)
    vt = v.rstrip(' ').encode('UTF-8')
    ba.extend(vt)
    ba.extend(b'\x03')  # ASCII End-Of-Text (ETX)
    return ba
def pack_i2(v,n):
    if (v == None): return(b'\x00')
    return struct.pack('h',v)
def pack_i4(v,n):
    if (v == None): return(b'\x00')
    return struct.pack('i',v)
...
packer_by_type = { # Lookup via native 'udt' value
        'bool'      : pack_i2,
        'bpchar'    : pack_str,
        'bytea'     : None,         # BLOBs manifest as memoryview (unsed)
        'char'      : pack_str,
        'date'      : pack_date,
        'float4'    : pack_f8,
        'float8'    : pack_f8,
        'geography' : pack_geom,
        'geometry'  : pack_geom,
        'int2'      : pack_i2,
        'int4'      : pack_i4,
        'int8'      : pack_i8,
        'numeric'   : pack_f8,
        'timestamp' : pack_date,
        'text'      : pack_str,
        'uuid'      : pack_str,
        'varchar'   : pack_str,
    }
    # Note: When object is instantiated, the packing function is added
    #       to a "packers" list
...
    def getHash(self,qRow):
        self.rowCount += 1
        ba1 = bytearray()
        for i,packer in enumerate(self.packers,start=self.keyCount):
            ba1 += packer(qRow[i],self.rowCount)
        return hashlib.new('sha1',ba1,usedforsecurity=False).hexdigest()

 

 

0 Kudos
VinceAngelo
Esri Esteemed Contributor

Okay, so it's a bit more complicated to drive from the joiner table...  Because some of the tables have compound keys, I have to isolate only changed rows by joining the joiner back to the table in the subquery, returning table objectids:

WHERE objectid in (
    SELECT t.objectid
    FROM   schema.table_j1 j
    JOIN   schema.table_t1 t USING (keycol1,...,keycolN)
)

 

And if it's versioned, then you have to join to the versioned view:

WHERE objectid in (
    SELECT t.objectid
    FROM schema.table_j1     j 
    JOIN schema.table_t1_evw t USING (keycol1,...,keycolN)
)

- V

0 Kudos
BrennanSmith1
Frequent Contributor

I'm less experienced than Vince in this, but I did just get a similar ETL pipeline script working with arcpy/pandas that sounds similar to your workflow. My data lived in our ESRI staging portal environment, and needed to be formatted heavily and then used to sync up the enterprise SQL db datasets. Things will be a little different since you're starting with CSVs, but the general approach would be:

  • Use python to compile all your data into a local file geodatabase with tables that exactly mirror your SQL target. Just save these tables with a _yyyymmdd in the name, and periodically delete old ones as you see fit.
  • Specify the key unique field(s) that define a matching record in both datasets. Treat this as a set(). I used .index() on my dataframes with globalID, but you can use anything as long as it is unique.  Now it's easy to use difference() and intersection() to know which rows need to be inserted, deleted, or updated
  • Use arcpy.management.Append() to add in the new rows
  • Use arcpy.da.UpdateCursor() to handle updates and deletions. I found that the Append GP tool was incredibly slow for Upserts.

I chunked things into sets of 1000, like this:

# df_src and df_tgt are dataframe representations of the source and target data
# src_lyr is just a MakeTableView() of the source data
# source_key is our unique field
# target_fc is your SQL table
insert_keys = df_src.index.difference(df_tgt.index)
insert_list = insert_keys.tolist()
for i in range(0, len(insert_list), 999):
    chunk = insert_list[i:i+999]
    formatted_keys = ",".join([f"'{k}'" for k in chunk])
    where_clause = f"{source_key} IN ({formatted_keys})"
    arcpy.management.SelectLayerByAttribute(src_lyr, "NEW_SELECTION", where_clause)
    arcpy.management.Append(src_lyr, target_fc, "NO_TEST", field_mapping=fms)

 

If any of this sounds helpful / interesting I can share more of the code. There was a lot more than went into preparing the updates than the appends. Good luck!

0 Kudos