Showing results for 
Show  only  | Search instead for 
Did you mean: 

ETL Patterns Blog

Esri Frequent Contributor Esri Frequent Contributor Esri Frequent Contributor
‎01-23-2024 01:34 PM

In an earlier post I got my feet wet using DuckDB in a hosted notebook in ArcGIS Online.  That post highlighted reading GeoParquet files in an object store to maintain a feature service.  That's a powerful workflow but it is much more likely your daily

... more
9 2 2,390
2 Comments
Esri Contributor
Esri Contributor

Hi @BruceHarold,

Thank you for all the great articles you’ve written about ETL, they’ve been really inspiring.

This particular one encouraged me to dive into the duck, especially with the recent addition of DuckDB support in the default Python environments in both ArcGIS Pro and ArcGIS Server.

I wanted to share some of my own experiments and created this little “Quack” Python script tool just for fun (see attached). It's using DuckDB to Arrow to ArcGIS pipeline as I wanted to investigate the Arrow support in ArcGIS as well.

The idea behind the tool is to be shared as a Web Tool in ArcGIS Enterprise and to offer an open way to load data into an app as an in-memory feature layer or table, with an option to store the result as a hosted feature layer as well (handled by the Web Tool). Both Map Viewer and Experience Builder can work directly with in-memory data, which provides a great UX for using the Web Tool itself.

So, have fun — and please keep publishing

Regards,
Matej

I paste it here as I do not see the attach file button, sorry

import duckdb
import arcpy
import pyarrow as pa

def add_esri_metadata(arrow_table: pa.Table, wkid: int = 4326) -> pa.Table:
    """Attach ArcGIS-compatible geometry metadata."""
    sr = arcpy.SpatialReference(wkid)
    wkt = sr.exportToString()

    new_fields = []
    for field in arrow_table.schema:
        metadata = {
            k if isinstance(k, bytes) else k.encode("utf-8"): (
                v if isinstance(v, bytes) else v.encode("utf-8")
            )
            for k, v in (field.metadata or {}).items()
        }

        if field.name == "geometry":
            metadata[b"esri.encoding"] = (
                b"WKB"  # TODO: add support for more encodings
            )
            metadata[b"esri.sr_wkt"] = wkt.encode("utf-8")

        if field.name == "OBJECTID":
            metadata[b"esri.oid"] = b"esri.int64"

        new_fields.append(pa.field(field.name, field.type, metadata=metadata))

    return pa.Table.from_arrays(
        arrow_table.columns,
        schema=pa.schema(new_fields, metadata=arrow_table.schema.metadata),
    )


def quack(sql, result="memory/quack"):

    con = duckdb.connect(":memory:")

    for ext in ("httpfs", "spatial"):
        con.execute(f"INSTALL {ext};")
        con.execute(f"LOAD {ext};")

    # Execute query and get Arrow table
    arr = con.execute(
        f"""
    {sql}
    """
    ).arrow()

    # Check if any geometry column exists
    has_geometry = False
    geometry_types = ["geometry", "geom", "shape", "wkb_geometry", "the_geom"]

    # Check column names (case-insensitive)
    column_names = [col.lower() for col in arr.column_names]

    # Check for geometry column by name
    for geom_type in geometry_types:
        if geom_type in column_names:
            has_geometry = True
            break

    # Also check schema for spatial types (DuckDB spatial extension)
    if not has_geometry:
        for field in arr.schema:
            field_type_str = str(field.type).lower()
            if any(
                spatial_type in field_type_str
                for spatial_type in [
                    "geometry",
                    "point",
                    "linestring",
                    "polygon",
                    "multipoint",
                    "multilinestring",
                    "multipolygon",
                    "geometrycollection",
                ]
            ):
                has_geometry = True
                break

    if arcpy.Exists(result):
        arcpy.AddMessage(f"Output {result} already exists - using Delete")
        arcpy.management.Delete(result)

    # Use appropriate tool based on geometry presence
    if has_geometry:
        arcpy.AddMessage("Geometry detected - using CopyFeatures")
        arcpy.management.CopyFeatures(add_esri_metadata(arr), result)
    else:
        arcpy.AddMessage("No geometry detected - using CopyRows")
        arcpy.management.CopyRows(arr, result)

    con.close()
    return result


if __name__ == "__main__":

    sql = arcpy.GetParameterAsText(0)
    result = arcpy.GetParameterAsText(1)

    result = quack(sql, result)

    arcpy.SetParameterAsText(1, result)

 

Edit: I should warn you against exposing the SQL parameter this way (without restrictions) in a Web Tool/GP Service if sharing to ArcGIS Enterprise, as it gives a lot of power if directly passed into the execute function. You can even list files from a file system from inside the ArcSOC.exe running the tool with SELECT * FROM glob('*'); or even return the contents of files themselves with SELECT string_agg(content, '\n') AS file_content FROM read_text(filename);

Esri Frequent Contributor
Esri Frequent Contributor

Thanks Matej!  This is really useful!

And I will keep publishing - in fact just finishing rewriting two posts now...