I have HDFS running and am trying to join a table from there to a feature class in ArcGIS Desktop. I can do any query I want inside the Cloudera Hue browser with the table, but I can not get it to join to my feature class in ArcGIS. I keep receiving an error that the table does not exist/is not supported. I am trying to join a ziptrips table (that has zip codes with population) to a feature class called nyc_zip. My apologies for any messy code, I have butchered this thing so far.
import arcpy import os import re import requests from pyhive import hive import contextlib class WebHDFSRes(object): def __init__(self, res): self.res = res def __enter__(self): return self.res def __exit__(self, exception_type, exception_value, traceback): self.res.close() class WebHDFS(object): def __init__(self, host, user, port=50070): self.host = host self.port = port self.user = user def open(self, hdfs_path, offset=-1, length=-1, buffer_size=-1): params = {'op': 'OPEN', 'user.name': self.user} if offset > 0: params['offset'] = offset if length > 0: params['length'] = length if buffer_size > 0: params['buffersize'] = buffer_size url = 'http://{}:{}/webhdfs/v1{}'.format(self.host, self.port, hdfs_path) return WebHDFSRes(requests.get(url, params=params, stream=True)) def list_status(self, hdfs_path, suffix_re='*'): files = [] prog = re.compile(suffix_re) params = {'op': 'LISTSTATUS', 'user.name': self.user} url = 'http://{}:{}/webhdfs/v1{}'.format(self.host, self.port, hdfs_path) with WebHDFSRes(requests.get(url, params=params)) as res: doc = res.json() for i in doc['FileStatuses']['FileStatus']: path_suffix = i['pathSuffix'] if prog.match(path_suffix): files.append('{}/{}'.format(hdfs_path, path_suffix)) return files class Toolbox(object): def __init__(self): self.label = 'ZipTool' self.alias = 'ZipTool' self.tools = [Tool] class Tool(object): def __init__(self): self.label = 'TripsTool' self.description = 'TripsTool' self.canRunInBackground = True def getParameterInfo(self): paramFC = arcpy.Parameter( name='out_fc', displayName='out_fc', direction='Output', datatype='Feature Layer', parameterType='Derived') # paramFC.symbology = os.path.join(os.path.dirname(__file__), 'GPSTool.lyr') paramName = arcpy.Parameter( name='in_name', displayName='Name', direction='Input', datatype='GPString', parameterType='Required') paramName.value = 'TripsPoints' paramHost = arcpy.Parameter( name='in_host', displayName='HDFS Host', direction='Input', datatype='GPString', parameterType='Required') paramHost.value = 'quickstart' paramUser = arcpy.Parameter( name='in_user', displayName='User name', direction='Input', datatype='GPString', parameterType='Required') paramUser.value = 'root' paramPath = arcpy.Parameter( name='in_path', displayName='HDFS Path', direction='Input', datatype='GPString', parameterType='Required') paramPath.value = '/user/root/' paramFile = arcpy.Parameter( name='in_file', displayName='HDFS File(s)', direction='Input', datatype='GPString', parameterType='Required') paramFile.value = 'ziptrips' return [paramFC, paramName, paramHost, paramUser, paramPath, paramFile] def isLicensed(self): return True def updateParameters(self, parameters): return def updateMessages(self, parameters): return def insert_row(self, cursor, line): try: zipcode = [0] pop = [1] cursor.insertRow(zipcode, pop) except Exception as e: arcpy.AddWarning(str(e)) def execute(self, parameters, messages): name = parameters[1].value host = parameters[2].value user = parameters[3].value path = parameters[4].value fext = parameters[5].value in_memory = True if in_memory: ws = 'in_memory' fc = ws + '/' + name else: fc = os.path.join(arcpy.env.scratchGDB, name) ws = os.path.dirname(fc) if arcpy.Exists(fc): arcpy.management.Delete(fc) sp_ref = arcpy.SpatialReference(4326) arcpy.env.workspace = "C:/Data/BigData/HV_Spatial" inFeature = "hivespatial.gdb/NYC_zips" inLayer = "nyc_lyr" inField = "ZCTA5CE10" joinFeature = parameters[5].value joinField = joinFeature[0] outFC = "join" arcpy.MakeFeatureLayer_management(inFeature, inLayer) arcpy.AddJoin_management(inLayer, inField, joinFeature, joinField) arcpy.CopyFeatures_management(inLayer, outFC) arcpy.management.CreateFeatureclass(ws, name, 'POLYGON', spatial_reference=sp_ref) arcpy.management.AddField(fc, 'Zip_Code', 'String', field_alias='Zip') arcpy.management.AddField(fc, 'POP', 'String', field_alias='POP') if in_memory: arcpy.AddMessage("Connecting...") with contextlib.closing(hive.connect(host, username=user, auth="NOSASL")) as conn: cursor = conn.cursor() if in_memory: arcpy.AddMessage("Executing...") cursor.execute(hql) with arcpy.da.InsertCursor(fc, ["Zip" "POP"]) as insert_cursor: if in_memory: arcpy.AddMessage("Fetching...") for row in cursor.fetchall(): insert_cursor.insertRow(row) parameters[0].value = fc