I have four dfs all backed up with reasonably-partitioned Parquet files.
The four dfs are role_df
, school_df
, company_df
, and profile_df
.
What I’m trying to do, in pandas:
org_ids = company_df.index.union(school_df.index)
profile_ids = profile_df.index
filtered_role_df = role_df[role_df.org_id.isin(org_ids) & role_df.profile_id.isin(profile_ids)]
# now persist filtered_role_df
In English: I’m trying to combine the index of school_df
and company_df
into one set-like object: org_ids
and take the index of profile_df
as one set-like object: profile_id
s and then filter role_df
by ensuring every profile_id
and org_id
appears in profile_ids
and org_ids
role_df
(and the filtered version) is too large to fit into memory, so I’m hoping that behind-the-scenes the filtering can happen to each partition separately and then be saved to Parquet again.
I’ve hit different issues trying to do this:
- there is no
.union
method implemented on Dask indices, - when I materialize the indices with
.compute
and then convert them into a set, saving to Parquet stalls and I get many warnings likeWARNING - full garbage collections took 29% CPU time recently (threshold: 10%)
What I found to work: materialize the indices, don’t convert to sets, union them, and then proceed
profile_ids = profile_df.index.compute()
company_ids = company_df.index.compute()
school_ids = school_df.index.compute()
org_ids = company_ids.union(school_ids)
filtered_role_df = role_df[role_df.org_id.isin(org_ids) & role_df.profile_id.isin(profile_ids)]
dd.to_parquet(filtered_role_df, ...)
This works, but I still get a warning:
UserWarning: Sending large graph of size 479.65 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
warnings.warn(
It seems unavoidable that each thread should receive both org_ids
and profile_ids
, right? The error suggests scattering. If I do so, I get TypeError: only list-like objects are allowed to be passed to isin(), you passed a [Future]
for this line filtered_role_df =
Why does passing sets not work?
Am I doing anything blatantly wrong in my working solution? Is there any way to improve this?
NOTE:
I’m running this locally, like so:
cluster = LocalCluster(n_workers=4, processes=True, threads_per_worker=4)
client = Client(cluster)
If I wanted to scale this I’m not sure if it’s better to, e.g., have 8 workers and 4 threads/worker, or 4 workers and 8 threads/worker.