Hi @guillaumeeb ,
Thanks a lot for your reply. I kind of solved it (I think?) 5 minutes ago, but I still have a related question.
My process is (code at the end of the message):
- read a selection of variables from a parquet db
- add empty columns for variables not present in the db, but present in another one that I am also reading
- return the resulting dask.dataframe
ddf
from_map()
maps a function that does 1-3 to a list of dbs, and merges all the returned ddf
s into one.
- Operates on combined
ddf
.
My error was returning ddf
instead of ddf[list(self.selected_variables)]
, that is, I was not ensuring that all ddf
s had the same column ordering.
from_map()
fails on computes (it does not fail on ddf.tail()
, ddf.head()
, ddf.describe()
). The workaround that I have found is to read in each db in a list of ddf
s with ddfs = [read_db_dask(db_name) for db_name in db_names]
m and then perform ddf = dd.concat(ddfs)
.
from_map()
works on computes if I implement steps 1-3 with pyarrow
, i.e. I read in a pyarrow table, add empty columns to the pyarrow table and at the end convert it to a pandas dataframe.
The issue that I have now, is that dask seems to trigger some compute when adding empty columns (step 2 of the process), and if I run the code on the whole database I run out of memory. While the users will always access a subset of the data and might not have this issue, I would like them to face this waiting time when they perform some computation (mean, a plot, anything) instead of when they retrieve the dataframe – I’d like the dataframe to stay lazy as long as possible. (I would also like to unit-test the function without subsetting the db if possible.)
Why is this happening? I expected that adding a column was a lazy instruction.
Here you have the main parts of the code:
def read_db_dask(self,db_name,target_schema=None):
"""Read parquet database applying all filters
Argument:
db_name -- database name
target_schema -- shared schema with other dbs
Returns:
ddf -- dask dataframe containing the filtered data
"""
# read schema to get the list of columns
db_schema = pq.read_schema(self.db_paths[db_name]+"/_common_metadata")
# read in only the selected variables that are present in the database
cols_to_read = [name for name in self.selected_variables if name in db_schema.names]
# read db
ddf = dd.read_parquet(
self.db_paths[db_name],
engine="pyarrow",
columns = cols_to_read,
filters = self.filters,
index = False,
dtype_backend = "pyarrow"
)
# add empty columns for variables that are present in self.selected_variables but not in current db
cols_to_add = [name for name in self.selected_variables if name not in db_schema.names]
for col in cols_to_add:
print(f"Adding empty columns {col} to db {db_name}")
if target_schema is not None:
field_idx = target_schema.get_field_index(col)
pa_dtype = target_schema.types[field_idx]
pd_dtype = self.dtype_mapping[pa_dtype]
ddf[col] = pd.Series(
[pd.NA]*len(ddf),
dtype=pd_dtype
)
else:
ddf[col] = pd.Series(
[pd.NA]*len(ddf)
)
return ddf[list(self.selected_variables)]
ddfs = [read_db_dask(db_name,target_schema=global_schema) for db_name in db_names]
ddf = dd.concat(ddfs)
FWIW, I also wrote an equivalent function that reads a pyarrow
table and add empty columns to it, and then converts it to a dask dataframe only on returning, and the compute is also triggered.