Dask bag read_sequence(...).map(...).compute() returning ValueError: too many values to unpack (expected 2)

609
3
07-30-2021 09:52 AM
Labels (1)
by Anonymous User
Not applicable

Hello everyone,
I would appreciate it if you can help me of what is wrong in my code. I have a list of objects, specifically ArcGIS items. My code is basically:

import dask.bag as db

def get_attribute(i):
    return i.title

bag = db.from_sequence(itms)
result = db.map(lambda x: get_attribute(x) , bag).compute()

Then I am getting this error:

ValueError: too many values to unpack (expected 2)

I thought maybe dask bag does not work with sequences of objects. It might be only for sequences of literal values. But I tried the same code with objects based on a simple class called Point:

class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def get_x(self):
        return self.x
    
    def get_y(self):
        return self.y
######################
######################
def get_x(p):
    return p.get_x()
    
p1 = Point(1,2)
p2 = Point(-3,-1)
p3 = Point(0,0)

pnts = [p1,p2,p3]

bag = db.from_sequence(pnts)
result = db.map(lambda x: get_x(x) , bag).compute()

And it is working fine!!

So please let me know what I'am doing wrong.

Thank you so much.

0 Kudos
3 Replies
by Anonymous User
Not applicable

Hi Ahmad - could you show how 'itms' is being defined? The dask.Bag documentation doesn't list any size restrictions but it does however mention "This sequence should be relatively small in memory. Dask Bag works best when it handles loading your data itself.". It's also possible that Bag does not support a sequence of ArcGIS item objects. 

https://docs.dask.org/en/latest/generated/dask.bag.from_sequence.html#dask.bag.from_sequence

 

Josh

0 Kudos
by Anonymous User
Not applicable

Hi Josh,

I basically created items using group.content() as a test of a small sample (9 items).

But in reality, I need to collect usage of last year, last 6 months, and last 60 days for 4K items. When I do so synchronously, it takes almost 1 minute per item, which means more than 2 days to finish 4K items.

This is the code snippet:

import dask.bag as dbag
from arcgis.gis import GIS, Group
from datetime import datetime

# user defined function
# basically it returns a dictionary of usage metrics for each item
def get_usage_dct(i):
    today = datetime.now().strftime('%Y-%m-%d')
    i_usage_df_1y = i.usage(date_range='1Y', as_df=True)
    i_usage_df_1y = i_usage_df_1y[i_usage_df_1y['Date'] < today]
    i_usage_df_6m = i.usage(date_range='6M', as_df=True)
    i_usage_df_6m = i_usage_df_6m[i_usage_df_6m['Date'] < today]
    i_usage_df_2m = i.usage(date_range='60D', as_df=True)
    i_usage_df_2m = i_usage_df_2m[i_usage_df_2m['Date'] < today]
    return {'item_name':i.title,'usage_count_1Y':i_usage_df_1y['Usage'].sum(),'usage_count_6m':i_usage_df_6m['Usage'].sum(),'usage_count_2m':i_usage_df_2m['Usage'].sum()}

# define the GIS object
mapit = GIS("home")

# getting items of a specific group (9 items for this example)
g = Group(mapit,'xxxxxxxxx')
itms = g.content()

# create items bag from the itms list
itms_bag = dbag.from_sequence(itms, npartitions = 9)

# this line is causing the error
final_bag = dbag.map(lambda x: get_usage_dct(x), itms_bag).compute()

I then added the scheduler attribute in compute in the last line as:

final_bag = dbag.map(lambda x: get_usage_dct(x), itms_bag).compute(scheduler="threads")

But now it gave me

Exception: Too many requests. Please try again later.
(Error Code: 400)

Which I believe is related to API limits since the item usage method is basically an API call. Please correct me if I am wrong.

I hope you have a good advice 🙂

Thank you

0 Kudos
by Anonymous User
Not applicable

Anyone?

0 Kudos