Filter df based on indices of other dfs

I have four dfs all backed up with reasonably-partitioned Parquet files.

The four dfs are role_df, school_df, company_df, and profile_df.

What I’m trying to do, in pandas:

org_ids = company_df.index.union(school_df.index)
profile_ids = profile_df.index

filtered_role_df = role_df[role_df.org_id.isin(org_ids) & role_df.profile_id.isin(profile_ids)]

# now persist filtered_role_df

In English: I’m trying to combine the index of school_df and company_df into one set-like object: org_ids and take the index of profile_df as one set-like object: profile_ids and then filter role_df by ensuring every profile_id and org_id appears in profile_ids and org_ids

role_df (and the filtered version) is too large to fit into memory, so I’m hoping that behind-the-scenes the filtering can happen to each partition separately and then be saved to Parquet again.

I’ve hit different issues trying to do this:

  • there is no .union method implemented on Dask indices,
  • when I materialize the indices with .compute and then convert them into a set, saving to Parquet stalls and I get many warnings like WARNING - full garbage collections took 29% CPU time recently (threshold: 10%)

What I found to work: materialize the indices, don’t convert to sets, union them, and then proceed

profile_ids = profile_df.index.compute()
company_ids = company_df.index.compute()
school_ids = school_df.index.compute()

org_ids = company_ids.union(school_ids)

filtered_role_df = role_df[role_df.org_id.isin(org_ids) & role_df.profile_id.isin(profile_ids)]

dd.to_parquet(filtered_role_df, ...)

This works, but I still get a warning:

UserWarning: Sending large graph of size 479.65 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(

It seems unavoidable that each thread should receive both org_ids and profile_ids, right? The error suggests scattering. If I do so, I get TypeError: only list-like objects are allowed to be passed to isin(), you passed a [Future] for this line filtered_role_df =

Why does passing sets not work?

Am I doing anything blatantly wrong in my working solution? Is there any way to improve this?

NOTE:
I’m running this locally, like so:

cluster = LocalCluster(n_workers=4, processes=True, threads_per_worker=4)
client = Client(cluster)

If I wanted to scale this I’m not sure if it’s better to, e.g., have 8 workers and 4 threads/worker, or 4 workers and 8 threads/worker.

It would help to have some reproducer to play with, but anyway, if I have to implement this in a distributed way, I think I’ll use merge with outer type in order to build an org_ids Dask Serie, and then two inner joins on org_id and prodile_id to keep the filtered values. I think this method would avoid materializing a huge set of IDs, and work fully distributed.