Hi!
I am trying to build an application capable of handling datasets with roughly 60-70 million rows, reading from CSV files. Ideally, I would like to use Dask for this, as Pandas takes a very long time to do anything with this dataset. My problem is that my code that works with Pandas gives me a KeyError in Dask. Could anyone please help me figure this out?
import pandas as pd
import dask.dataframe as dd
import time
def anonymizer_test():
start_time = time.time()
file = "data1.csv"
try:
df = dd.read_csv(file, dtype={'Admission_Method': 'object',
'IMD_Decile_From_LSOA': 'float64',
'Provider_Patient_Distance_Miles': 'float64',
'Sex': 'float64',
'Admitted_Flag': 'float64',
})
except Exception as e:
print("Exception in read_csv after " + str((time.time() - start_time)) + " seconds.")
raise e
def generalize_timestamp(timestamp):
try:
return timestamp[:10]
except:
pass
try:
# the meta argument here is suggested by Dask when I get the error.
# when running using Pandas, it has to be removed or it will cause another error.
df['AE_Arrive_Date'] = df['AE_Arrive_Date'].apply(generalize_timestamp, meta=('AE_Arrive_Date', 'object'))
except Exception as e:
print("Exception in apply after " + str((time.time() - start_time)) + " seconds.")
raise e
try:
df.to_csv("output2.csv")
except Exception as e:
print("Exception in to_csv after " + str((time.time() - start_time)) + " seconds.")
raise e
print("Program finished in " + str((time.time() - start_time)) + " seconds.")
anonymizer_test()
I am trying to build a program to manipulate large datasets (65 million+ rows) and I landed on using Dask. I have never used Dask or Pandas before, so please forgive me if there are any beginner mistakes.
What I want to do is read a large CSV file into a dataframe, change one or more columns, and then save the result to a different CSV-file. I have succesfully done this using Dask for one column, but for another one in the same dataset, I get a KeyError for a column that exists.
import pandas as pd
import dask.dataframe as dd
import time
def anonymizer_test():
start_time = time.time()
file = "data1.csv"
try:
df = dd.read_csv(file, dtype={'Admission_Method': 'object',
'IMD_Decile_From_LSOA': 'float64',
'Provider_Patient_Distance_Miles': 'float64',
'Sex': 'float64',
'Admitted_Flag': 'float64',
})
except Exception as e:
print("Exception in read_csv after " + str((time.time() - start_time)) + " seconds.")
raise e
def generalize_timestamp(timestamp):
try:
return timestamp[:10]
except:
pass
try:
# the meta argument here is suggested by Dask when I get the error.
# when running using Pandas, it has to be removed or it will cause another error.
df['AE_Arrive_Date'] = df['AE_Arrive_Date'].apply(generalize_timestamp, meta=('AE_Arrive_Date', 'object'))
except Exception as e:
print("Exception in apply after " + str((time.time() - start_time)) + " seconds.")
raise e
try:
df.to_csv("output2.csv")
except Exception as e:
print("Exception in to_csv after " + str((time.time() - start_time)) + " seconds.")
raise e
print("Program finished in " + str((time.time() - start_time)) + " seconds.")
anonymizer_test()
Running this code, it will run for a few seconds and create the output2.csv file, filling it with around 500k records before I get a KeyError. I have checked both the input file and the output file, and as far as I can see, there is nothing out of the ordinary on the row where it crashes. Simply changing this to use Pandas to read the csv file is enough to make it work as intended, however it takes a very long time and if possible I would like to use Dask. The error I get is:
Exception in to_csv after 3.491415500640869 seconds. Traceback (most recent call last): File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pandas\core\indexes\base.py”, line 3621, in get_loc return self._engine.get_loc(casted_key) File “pandas_libs\index.pyx”, line 136, in pandas._libs.index.IndexEngine.get_loc File “pandas_libs\index.pyx”, line 163, in pandas._libs.index.IndexEngine.get_loc File “pandas_libs\hashtable_class_helper.pxi”, line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item File “pandas_libs\hashtable_class_helper.pxi”, line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item KeyError: ‘AE_Arrive_Date’
The above exception was the direct cause of the following exception:
Traceback (most recent call last): File “C:\Users\John\Documents\k-anon\anonymizer.py”, line 103, in anonymizertest() File “C:\Users\John\Documents\k-anon\anonymizer.py”, line 99, in anonymizer_test raise e File “C:\Users\John\Documents\k-anon\anonymizer.py”, line 96, in anonymizer_test df.to_csv(“output2.csv”) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\dataframe\core.py”, line 1640, in to_csv return to_csv(self, filename, *kwargs) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\dataframe\io\csv.py”, line 957, in to_csv return list(dask.compute(values, compute_kwargs)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\base.py”, line 575, in compute results = schedule(dsk, keys, kwargs) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\threaded.py”, line 81, in get results = get_async( File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\local.py”, line 508, in get_async raise_exception(exc, tb) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\local.py”, line 316, in reraise raise exc File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\local.py”, line 221, in execute_task result = _execute_task(task, data) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in _execute_task return func( (_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\optimization.py”, line 990, in _ call return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 149, in get result = _execute_task(task, cache) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in _execute_task return func((_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in return func((_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in _execute_task return func((_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in return func((_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 113, in _execute_task return [_execute_task(a, cache) for a in arg] File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 113, in return [_execute_task(a, cache) for a in arg] File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in _execute_task return func( (_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pandas\core\frame.py”, line 3505, in _ getitem indexer = self.columns.get_loc(key) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pandas\core\indexes\base.py”, line 3623, in get_loc raise KeyError(key) from err KeyError: ‘AE_Arrive_Date’
I have no idea what might cause this, so I’m grateful for any help!