Hi. I am using dask.dataframe
to read a very large dataset of 20TB containing 97 billion Knowledge graph triples. I am using dask.dataframe.read_csv
method to read a smaller version of dataset containing 795 million triples of size 152GB. The .txt file contains 4 columns separated by white space. A sample of the dataset:
<http://whale.data.dice-research.org/resource#node21f760a41de19b4a8370fd8f49f6e87e> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/ontology/wo/species> .
<http://whale.data.dice-research.org/resource#nodea7ba3274fe56fb8342b740aef391a3> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/ontology/wo/species> .
<http://whale.data.dice-research.org/resource#nodea7ba3274fe56fb8342b740aef391a3> <http://purl.org/ontology/wo/kingdom> <http://whale.data.dice-research.org/resource#node4a5dd7cade315a1a7a63e7b6881f18a> .
Context: The dataset consists of KG data with subject, relation, and object as columns 0, 1, and 2 respectively. Column 4 contains ‘.’ meaning the triple has ended. My task is to calculate the total number of triples, and the total number of unique entities and relations in this dataset. Since the file is very huge, I cannot use rdflib
as it consumes all the memory.
I was using the following pandas code to read the dataset and calculate the stats required for the dataset:
import pandas as pd
dtype = {'subject': str, 'relation': str, 'object': str}
unique_entities = set()
unique_relations = set()
total_triples = 0
try:
reader = pd.read_csv(file_path, sep="\s+", header=None, names=['subject', 'relation', 'object'], usecols=[0, 1, 2], dtype=dtype, chunksize=chunk_size, memory_map=True, on_bad_lines='warn')
for chunk in reader:
total_triples += len(chunk)
unique_entities.update(chunk['subject'].unique()) # Update entities from 'subject'
unique_entities.update(chunk['object'].unique()) # Update entities from 'object'
unique_relations.update(chunk['relation'].unique()) # Update relations
except Exception as e:
logging.error(f"An error occurred: {str(e)}", exc_info=True)
logging.info(f'Total number of triples: {total_triples}')
logging.info(f'Number of unique entities: {len(unique_entities)}')
logging.info(f'Number of unique relations: {len(unique_relations)}')
For a small dataset of size 84 MB, this works because it uses less RAM to store the data. But for a very very large dataset, it gives an out-of-memory event kill. Therefore, I used dask for that. Here’s the code for dask:
import dask.dataframe as dd
ddf = dd.read_csv(file_path, sep="\s+", header=None, usecols=[0, 1, 2], dtype=str, blocksize=25e6)
unique_entities = dd.concat([ddf[0], ddf[2]], axis=0).drop_duplicates()
unique_relations = ddf[1].drop_duplicates()
total_triples = len(ddf)
num_unique_entities = unique_entities.compute()
num_unique_relations = unique_relations.compute()
print(f"Total triples: {total_triples}") # No need to call compute here
print(f"Unique entities: {len(num_unique_entities)}")
print(f"Unique relations: {len(num_unique_relations)}")
Now this code works. However, when I try to use the following code:
# Reading the dataset
ddf = dd.read_csv(file_path, sep="\s+", header=None, usecols=[0, 1, 2], names=['subject', 'relation', 'object'], dtype=str, blocksize=25e6)
unique_entities = dd.concat([ddf['subject'], ddf['object']], axis=0).drop_duplicates()
unique_relations = ddf['relation'].drop_duplicates()
num_unique_entities = unique_entities.compute()
num_unique_relations = unique_relations.compute()
print(f"Total triples: {total_triples}") # No need to call compute here
print(f"Unique entities: {len(num_unique_entities)}")
print(f"Unique relations: {len(num_unique_relations)}")
It gives me the error at num_unique_entities = unique_entities.compute()
stating that “ValueError: An error occurred while calling the read_csv method registered to the pandas backend. Original Message: Number of passed names did not match number of header fields in the file
”
Please tell me why can’t I use the “names” and “usecols” attributes in Dask? I cannot understand this. Even when I try to get the data from a single column using dask, it gives the similar error.