Select to view content in your preferred language

MultiProcessing with ArcGIS Server

171
1
3 weeks ago
RileyTuccio
New Contributor

I'm currently working through integrating the python multiprocessing module into my code that takes zipped shapefiles, unzips them, makes feature classes using those shapefiles as a template to then be put in a scratch geodatabase, and perform functions on them (from basic "add fields" to SA tools). I'm running into issues that I feel like I have exhausted every resource I've come across to understand. My proficiency in multiprocessing is next to none, so I'm hoping it's something obvious. Thank you for any feedback you can give me!

 

import arcpy
import os
import multiprocessing
from pathlib import Path
import zipfile
import multiprocessing

def unzipping(decadeFilePath): #unzips all the shapefiles in a directory
    fileList = []
    files = Path(decadeFilePath).glob('*')
    for file in files:
        oFileName = file.name
        print(oFileName)
        with zipfile.ZipFile(file, 'r') as zip_ref:
            zip_ref.extractall(decadeFilePath)
        x = oFileName.split('.zip')
        fileName =''.join(x)
        print(f"Extracted {fileName}")
        fileList.append(fileName)
    return fileList #returns a list of the names to create geodatabases


def deleting(decadeFilePath):
    extensions = ["cpg", "dbf", "prj", "sbn", "sbx", "shp.xml","shp","shx"] 
    for filename in os.listdir(decadeFilePath):
        filePath = os.path.join(decadeFilePath, filename)
        if os.path.isfile(filePath):
            if any(filename.lower().endswith(f".{ext}") for ext in extensions):
                try:
                    os.remove(filePath)
                    print(f"Deleted file: {filePath}")
                except Exception as e:
                    print(f"Error deleting file {filePath}: {e}")


def createFeatureClasses(gdb,files,shapefiles,decadeFilePath):
    for fileN in files:
        xyz = fileN.split('__')
        fileName = xyz[1]
        if arcpy.Exists(gdb+rf'/{fileName}'):  #checking to make sure gdb doesn't exist
            print('featureclass already created')
        else:
#            arcpy.management.CreateFeatureclass(gdb, f'{fileName}', "POLYGON", 
#                                    shapefiles + rf'/{fileN}.shp', "DISABLED", "DISABLED", 
#                                    shapefiles + rf'/{fileN}.shp')
            arcpy.management.CreateFeatureclass(
                out_path=gdb,
                out_name=f'{fileName}',
                geometry_type="POLYGON",
                template=decadeFilePath + rf'/{fileN}.shp',
                has_m="DISABLED",
                has_z="DISABLED",
                spatial_reference="",
                config_keyword="",
                spatial_grid_1=0,
                spatial_grid_2=0,
                spatial_grid_3=0,
                out_alias=""
            )
            arcpy.management.Append(
                inputs=decadeFilePath + rf'/{fileN}.shp',
                target=gdb+rf'/{fileName}',
                schema_type="TEST",
                field_mapping=None,
                subtype="",
                expression="",
                match_fields=None,
                update_geometry="NOT_UPDATE_GEOMETRY"
            )


def addfield(fc):
    print(fc)
    arcpy.management.AddField(fc, "area", "DOUBLE", "", "", "", "", "", "", "") ##test function for multiprocessing
    print(fc)


def main():
    jobs = []
    decadeFilePath = r"/path/to/templatefiles"
    shapefiles = r"/path/to/output"
 
    deleting(decadeFilePath)
    files = unzipping(decadeFilePath)
    gdb = arcpy.env.scratchGDB
    print(arcpy.env.scratchGDB)
    print(gdb)
    createFeatureClasses(gdb,files,shapefiles,decadeFilePath)
    arcpy.env.workspace = gdb
    arcpy.env.scratchWorkspace = r"/data/arcgis/server/temp/scratch.gdb"
    prc = 46
    pool = multiprocessing.Pool(prc)
    fcs = arcpy.ListFeatureClasses('*')
    fc_list = [os.path.join(gdb, fc) for fc in fcs]
    for fc in fc_list:
        print(f'passing {fc} to datapool')
        results = pool.apply_async(addfield,(fc,))
        jobs.append(results)
    for job in jobs:
        results = [result.get() for result in jobs]
        print("results")
 

if __name__ == '__main__':
    main()
    print("done")    
0 Kudos
1 Reply
HaydenWelch
Frequent Contributor

I think you might have a syntax error in your apply_async call, you're passing it a tuple when you should be passing a string path.

You're also just printing "results" instead of f"{results}"

 

I'm also not a multiprocessing expert or anything, but your main function is applying the asynchronous stuff within a sequential loop which might cause issues. I think using something like starmap might be better, but I haven't tested and again am no expert on this subject.

0 Kudos