HDFS: Join Table to ArcGIS Feature Class

508
0
03-06-2018 03:36 PM
DonaldShaw5
New Contributor II

I have HDFS running and am trying to join a table from there to a feature class in ArcGIS Desktop. I can do any query I want inside the Cloudera Hue browser with the table, but I can not get it to join to my feature class in ArcGIS. I keep receiving an error that the table does not exist/is not supported. I am trying to join a ziptrips table (that has zip codes with population) to a feature class called nyc_zip. My apologies for any messy code, I have butchered this thing so far.

import arcpy
import os
import re
import requests
from pyhive import hive
import contextlib

class WebHDFSRes(object):
    def __init__(self, res):
        self.res = res

    def __enter__(self):
        return self.res

    def __exit__(self, exception_type, exception_value, traceback):
        self.res.close()


class WebHDFS(object):
    def __init__(self, host, user, port=50070):
        self.host = host
        self.port = port
        self.user = user

    def open(self, hdfs_path, offset=-1, length=-1, buffer_size=-1):
        params = {'op': 'OPEN', 'user.name': self.user}
        if offset > 0:
            params['offset'] = offset
        if length > 0:
            params['length'] = length
        if buffer_size > 0:
            params['buffersize'] = buffer_size
        url = 'http://{}:{}/webhdfs/v1{}'.format(self.host, self.port, hdfs_path)
        return WebHDFSRes(requests.get(url, params=params, stream=True))

    def list_status(self, hdfs_path, suffix_re='*'):
        files = []
        prog = re.compile(suffix_re)
        params = {'op': 'LISTSTATUS', 'user.name': self.user}
        url = 'http://{}:{}/webhdfs/v1{}'.format(self.host, self.port, hdfs_path)
        with WebHDFSRes(requests.get(url, params=params)) as res:
            doc = res.json()
            for i in doc['FileStatuses']['FileStatus']:
                path_suffix = i['pathSuffix']
                if prog.match(path_suffix):
                    files.append('{}/{}'.format(hdfs_path, path_suffix))
        return files


class Toolbox(object):
    def __init__(self):
        self.label = 'ZipTool'
        self.alias = 'ZipTool'
        self.tools = [Tool]


class Tool(object):
    def __init__(self):
        self.label = 'TripsTool'
        self.description = 'TripsTool'
        self.canRunInBackground = True

    def getParameterInfo(self):
        paramFC = arcpy.Parameter(
            name='out_fc',
            displayName='out_fc',
            direction='Output',
            datatype='Feature Layer',
            parameterType='Derived')
        #  paramFC.symbology = os.path.join(os.path.dirname(__file__), 'GPSTool.lyr')

        paramName = arcpy.Parameter(
            name='in_name',
            displayName='Name',
            direction='Input',
            datatype='GPString',
            parameterType='Required')
        paramName.value = 'TripsPoints'

        paramHost = arcpy.Parameter(
            name='in_host',
            displayName='HDFS Host',
            direction='Input',
            datatype='GPString',
            parameterType='Required')
        paramHost.value = 'quickstart'

        paramUser = arcpy.Parameter(
            name='in_user',
            displayName='User name',
            direction='Input',
            datatype='GPString',
            parameterType='Required')
        paramUser.value = 'root'

        paramPath = arcpy.Parameter(
            name='in_path',
            displayName='HDFS Path',
            direction='Input',
            datatype='GPString',
            parameterType='Required')
        paramPath.value = '/user/root/'

        paramFile = arcpy.Parameter(
            name='in_file',
            displayName='HDFS File(s)',
            direction='Input',
            datatype='GPString',
            parameterType='Required')
        paramFile.value = 'ziptrips'

        return [paramFC, paramName, paramHost, paramUser, paramPath, paramFile]

    def isLicensed(self):
        return True

    def updateParameters(self, parameters):
        return

    def updateMessages(self, parameters):
        return

    def insert_row(self, cursor, line):
        try:
            zipcode = [0]
            pop = [1]

            cursor.insertRow(zipcode, pop)
        except Exception as e:
            arcpy.AddWarning(str(e))

    def execute(self, parameters, messages):
        name = parameters[1].value
        host = parameters[2].value
        user = parameters[3].value
        path = parameters[4].value
        fext = parameters[5].value

        in_memory = True
        if in_memory:
            ws = 'in_memory'
            fc = ws + '/' + name
        else:
            fc = os.path.join(arcpy.env.scratchGDB, name)
            ws = os.path.dirname(fc)

        if arcpy.Exists(fc):
            arcpy.management.Delete(fc)

        sp_ref = arcpy.SpatialReference(4326)
        arcpy.env.workspace = "C:/Data/BigData/HV_Spatial"
        inFeature = "hivespatial.gdb/NYC_zips"
        inLayer = "nyc_lyr"
        inField = "ZCTA5CE10"
        joinFeature = parameters[5].value
        joinField = joinFeature[0]
        outFC = "join"

        arcpy.MakeFeatureLayer_management(inFeature, inLayer)
        arcpy.AddJoin_management(inLayer, inField, joinFeature, joinField)
        arcpy.CopyFeatures_management(inLayer, outFC)

        arcpy.management.CreateFeatureclass(ws, name, 'POLYGON', spatial_reference=sp_ref)
        arcpy.management.AddField(fc, 'Zip_Code', 'String', field_alias='Zip')
        arcpy.management.AddField(fc, 'POP', 'String', field_alias='POP')

        if in_memory:
            arcpy.AddMessage("Connecting...")
        with contextlib.closing(hive.connect(host, username=user, auth="NOSASL")) as conn:
            cursor = conn.cursor()
            if in_memory:
                arcpy.AddMessage("Executing...")
            cursor.execute(hql)
            with arcpy.da.InsertCursor(fc, ["Zip" "POP"]) as insert_cursor:
                if in_memory:
                    arcpy.AddMessage("Fetching...")
                for row in cursor.fetchall():
                    insert_cursor.insertRow(row)
            parameters[0].value = fc
0 Kudos
0 Replies