Good morning,
I have a hash partitioned dataframe from Spark (read in from parquet). I am moving everything to dask. The hash partitioned DF in Spark, when used in dask, performs terribly for joins. I believe this is expected.
So I wish to repartition and index on a column called “source_id” and have this sorted, such that my N partitions are then by range of source ID. I then want to save this to parquet so it can be read in already indexed and sorted in future. I want fast lookups by source_id and fast joins between multiple datasets all indexed and sorted on source_id.
But I am having issues when reading in, recovering the index and divisions info.
MWE below:
Repartition and save:
# Config
num_new_partitions = 20
in_dataset = '/mnt/foo/part.1.parquet' # I have thousands of files, but am trying with 1 for now
out_dataset = '/mnt/bar/'
# Read in
ddf = dd.read_parquet(in_dataset)
# Sort by source_id
ddf = ddf.sort_values('source_id')
# Define a sorted index on source_id
ddf = ddf.set_index('source_id', sorted=True)
# Repartition
ddf = ddf.repartition(npartitions=num_new_partitions)
# Write to disk
ddf.to_parquet(out_dataset, write_index=True)
The above creates 20 files.
Further if I check:
# Check the _meta, known_divisions etc
print(ddf._meta)
Empty DataFrame
Columns: [List of columns, redacted for brevity but source_id is not one of them]
Index: []
[0 rows x 25 columns]
print(ddf.known_divisions)
True
print(ddf.divisions)
(List of divisisions, redacted for brevity)
print(ddf.index.name)
source_id
Question 1: ddf.index.name shows “source_id” but ddf._meta shows index: . Why?
Now upon reading back in, firstly inspecting with pyarrow
# Read the parquet metadata
parquet_file = pq.ParquetFile("/mnt/bar/part.1.parquet")
print(parquet_file.metadata)
<pyarrow._parquet.FileMetaData object at 0x7fd9558fdd50>
created_by: parquet-cpp-arrow version 16.1.0
num_columns: 26
num_rows: 2419
num_row_groups: 1
format_version: 2.6
serialized_size: 21292
print(parquet_file.schema)
<pyarrow._parquet.ParquetSchema object at 0x7fd9564e2780>
required group field_id=-1 schema {
# Redacted most for brevity, but it includes source_id
optional int64 field_id=-1 source_id;
}
# Pulling out the ranges
for row_group_idx in range(parquet_file.num_row_groups):
row_group = parquet_file.metadata.row_group(row_group_idx)
metadata_dict[f"Row Group {row_group_idx}"] = {}
for col_idx in range(row_group.num_columns):
column_meta = row_group.column(col_idx)
col_name = parquet_file.schema.column(col_idx).name
metadata_dict[f"Row Group {row_group_idx}"][col_name] = {
"min": column_meta.statistics.min if column_meta.statistics else None,
"max": column_meta.statistics.max if column_meta.statistics else None,
"null_count": column_meta.statistics.null_count if column_meta.statistics else None,
}
# Print raw metadata JSON
print(json.dumps(metadata_dict, indent=4))
# Redacted most for brevity but ...
"source_id": {
"min": 612340879030728064,
"max": 1221753891743300864,
"null_count": 0
}
# Get the pandas index
metadata = parquet_file.metadata.metadata
if b"pandas" in metadata:
pandas_metadata = json.loads(metadata[b"pandas"])
print("Stored Index Columns:", pandas_metadata.get("index_columns", []))
Stored Index Columns: ['source_id']
From my perspective this all looks good, there are ranges on the source_id column and it’s known to be an index.
Now onto the Dask Dataframe:
dataset = '/mnt/bar/*.parquet'
# Attempt 1, naive
ddf = dd.read_parquet(
dataset,
)
print(ddf._meta)
Empty DataFrame
Columns: [Redacted, source_id not present]
Index: []
print(ddf.index.name)
source_id
print(ddf.known_divisions)
False
print(ddf.divisions)
(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
# Attempt 2, use index on the read
ddf = dd.read_parquet(
dataset,
index='source_id'
)
print(ddf._meta)
Empty DataFrame
Columns: [Redacted, source_id not present]
Index: []
print(ddf.index.name)
source_id
print(ddf.known_divisions)
False
print(ddf.divisions)
(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
# Attempt 3, use set_index
ddf = dd.read_parquet(
dataset,
index='source_id'
)
ddf = ddf.set_index('source_id', sorted=True)
print(ddf._meta)
Empty DataFrame
Columns: [Redacted, source_id not present]
Index: []
print(ddf.index.name)
source_id
print(ddf.known_divisions)
False
print(ddf.divisions)
(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
# Attempt 4, use persist to force a distributed computation
ddf = dd.read_parquet(
dataset,
index='source_id'
)
ddf = ddf.set_index('source_id', sorted=True).persist()
# Wait until above has completed
print(ddf._meta)
Empty DataFrame
Columns: [Redacted, source_id not present]
Index: []
print(ddf.index.name)
source_id
print(ddf.known_divisions)
False
print(ddf.divisions)
(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
My goal is to partition the dataset into N files, and have the “source_id” column used as the index, and have it sorted. My goal is to join this with other datasets that will also have a “source_id” column as the index ans sorted.
I obviously don’t want to have to sort and partition every time I load the data. How can I achieve this?
Thanks