Thank you for assisting me with this. I have created a minimal example that should help to reproduce it.
What it does is create a dummy dataframe with large string values per partition, allowing you to experiment with different dataframe sizes.
Furthermore, I have limited the LocalClient
to a size of 1 worker, 1 thread, and 2GB of memory per worker. So, my assumption was that if I were to create partition sizes <2GB, map_partition
would handle them sequentially.
Unfortunately, we run into an out-of-memory issue because the first partition is computed, then the second one, and finally, everything is written to disk.
If you change the memory_limit_per_worker
to 4GB in this example, the out-of-memory issue will not occur since both partitions will fit into memory.
The artificial delays and loops are primarily for simulating longer-running workloads. This makes debugging easier as well.
Requirements:
dask==2023.5.0
distributed==2023.5.0
pandas==2.0.2
pyarrow==12.0.1
Source code:
import pandas as pd
from dask.distributed import LocalCluster, performance_report
import dask.dataframe as dd
from distributed import Client
# Arguments
output_dir = './tmp'
num_workers = 1
thread_per_worker = 1
memory_limit_per_worker = 2e9 # 2Gb
num_thread_saving_parquets = 1
# Helper functions
def artificial_delay(milliseconds):
end_time = time.time() + milliseconds / 1000
while time.time() < end_time:
_ = 2 + 2 # Perform a simple computation
def process_partition(df, gb_to_produce = 1):
"""Dummy method for processing partition. Large objects are created to simulate oom."""
print(df.head())
print("process partition")
artificial_delay(10000) # Just needed to gain extra time for debugging
counter = 0
large_string = "t" * ((1024 * 1024 * 128)) # ~130Mb
to_append = []
while counter < (10 * gb_to_produce) :
counter += 1
to_append.append([counter, large_string])
return pd.DataFrame(data=to_append, columns=["id", "column_a"])
if __name__ == "__main__":
worker_kwargs = {
"n_workers": num_workers,
"memory_limit": memory_limit_per_worker,
"threads_per_worker": thread_per_worker
}
cluster = LocalCluster(**worker_kwargs)
client = Client(cluster)
with performance_report(filename="export.html"):
dataframe = dd.from_dict({"id": [0, 1], "column_a": ["a", "b"]}, npartitions=2)
dataframe = dataframe.map_partitions(process_partition, meta=dataframe)
index_df = dataframe[["id"]]
column_df = dataframe[["column_a"]]
write_tasks = [
dd.to_parquet(index_df, output_dir, compute=False),
dd.to_parquet(column_df, output_dir, compute=False)
]
dd.compute(*write_tasks)
In the meantime, I have also considered a potential workaround. We could invoke dataframe.persist()
and then iterate through the list of futures. If a future has completed, we can immediately write it to disk. This approach appears to work well, even with larger setups.
Pseudo code:
dataframe = dataframe.persist()
futures_list = futures_of(dataframe)
future_to_partition = {}
for partition_idx, future in enumerate(futures_list):
future_to_partition[future] = partition_idx
while futures_list:
completed, not_completed = wait(futures_list, return_when="FIRST_COMPLETED")
for future in completed:
partition_idx = future_to_partition[future]
pandas_df = future.result()
pandas_df.to_parquet()
Please let me know if anything is unclear or omitted. Thank you very much!