Dask gives KeyError with read_csv

Hi!

I am trying to build an application capable of handling datasets with roughly 60-70 million rows, reading from CSV files. Ideally, I would like to use Dask for this, as Pandas takes a very long time to do anything with this dataset. My problem is that my code that works with Pandas gives me a KeyError in Dask. Could anyone please help me figure this out?

import pandas as pd
import dask.dataframe as dd
import time

def anonymizer_test():
    start_time = time.time()
    file = "data1.csv"

    try:
        df = dd.read_csv(file, dtype={'Admission_Method': 'object',
        'IMD_Decile_From_LSOA': 'float64',
        'Provider_Patient_Distance_Miles': 'float64',
        'Sex': 'float64',
        'Admitted_Flag': 'float64',
        })
    except Exception as e:
        print("Exception in read_csv after " + str((time.time() - start_time)) + " seconds.")
        raise e

    def generalize_timestamp(timestamp):
        try:    
            return timestamp[:10]
        except:
            pass

    try:
        # the meta argument here is suggested by Dask when I get the error.
        # when running using Pandas, it has to be removed or it will cause another error.
        df['AE_Arrive_Date'] = df['AE_Arrive_Date'].apply(generalize_timestamp, meta=('AE_Arrive_Date', 'object'))
    except Exception as e:
        print("Exception in apply after " + str((time.time() - start_time)) + " seconds.")
        raise e

    try:
        df.to_csv("output2.csv")
    except Exception as e:
        print("Exception in to_csv after " + str((time.time() - start_time)) + " seconds.")
        raise e

    print("Program finished in " + str((time.time() - start_time)) + " seconds.")

anonymizer_test()

I am trying to build a program to manipulate large datasets (65 million+ rows) and I landed on using Dask. I have never used Dask or Pandas before, so please forgive me if there are any beginner mistakes.

What I want to do is read a large CSV file into a dataframe, change one or more columns, and then save the result to a different CSV-file. I have succesfully done this using Dask for one column, but for another one in the same dataset, I get a KeyError for a column that exists.

import pandas as pd
import dask.dataframe as dd
import time

def anonymizer_test():
    start_time = time.time()
    file = "data1.csv"

    try:
        df = dd.read_csv(file, dtype={'Admission_Method': 'object',
        'IMD_Decile_From_LSOA': 'float64',
        'Provider_Patient_Distance_Miles': 'float64',
        'Sex': 'float64',
        'Admitted_Flag': 'float64',
        })
    except Exception as e:
        print("Exception in read_csv after " + str((time.time() - start_time)) + " seconds.")
        raise e

    def generalize_timestamp(timestamp):
        try:    
            return timestamp[:10]
        except:
            pass

    try:
        # the meta argument here is suggested by Dask when I get the error.
        # when running using Pandas, it has to be removed or it will cause another error.
        df['AE_Arrive_Date'] = df['AE_Arrive_Date'].apply(generalize_timestamp, meta=('AE_Arrive_Date', 'object'))
    except Exception as e:
        print("Exception in apply after " + str((time.time() - start_time)) + " seconds.")
        raise e

    try:
        df.to_csv("output2.csv")
    except Exception as e:
        print("Exception in to_csv after " + str((time.time() - start_time)) + " seconds.")
        raise e

    print("Program finished in " + str((time.time() - start_time)) + " seconds.")

anonymizer_test()

Running this code, it will run for a few seconds and create the output2.csv file, filling it with around 500k records before I get a KeyError. I have checked both the input file and the output file, and as far as I can see, there is nothing out of the ordinary on the row where it crashes. Simply changing this to use Pandas to read the csv file is enough to make it work as intended, however it takes a very long time and if possible I would like to use Dask. The error I get is:

Exception in to_csv after 3.491415500640869 seconds. Traceback (most recent call last): File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pandas\core\indexes\base.py”, line 3621, in get_loc return self._engine.get_loc(casted_key) File “pandas_libs\index.pyx”, line 136, in pandas._libs.index.IndexEngine.get_loc File “pandas_libs\index.pyx”, line 163, in pandas._libs.index.IndexEngine.get_loc File “pandas_libs\hashtable_class_helper.pxi”, line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item File “pandas_libs\hashtable_class_helper.pxi”, line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item KeyError: ‘AE_Arrive_Date’

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File “C:\Users\John\Documents\k-anon\anonymizer.py”, line 103, in anonymizertest() File “C:\Users\John\Documents\k-anon\anonymizer.py”, line 99, in anonymizer_test raise e File “C:\Users\John\Documents\k-anon\anonymizer.py”, line 96, in anonymizer_test df.to_csv(“output2.csv”) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\dataframe\core.py”, line 1640, in to_csv return to_csv(self, filename, *kwargs) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\dataframe\io\csv.py”, line 957, in to_csv return list(dask.compute(values, compute_kwargs)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\base.py”, line 575, in compute results = schedule(dsk, keys, kwargs) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\threaded.py”, line 81, in get results = get_async( File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\local.py”, line 508, in get_async raise_exception(exc, tb) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\local.py”, line 316, in reraise raise exc File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\local.py”, line 221, in execute_task result = _execute_task(task, data) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in _execute_task return func( (_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\optimization.py”, line 990, in _ call return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 149, in get result = _execute_task(task, cache) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in _execute_task return func((_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in return func((_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in _execute_task return func((_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in return func((_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 113, in _execute_task return [_execute_task(a, cache) for a in arg] File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 113, in return [_execute_task(a, cache) for a in arg] File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\dask\core.py”, line 119, in _execute_task return func( (_execute_task(a, cache) for a in args)) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pandas\core\frame.py”, line 3505, in _ getitem indexer = self.columns.get_loc(key) File “C:\Users\John\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pandas\core\indexes\base.py”, line 3623, in get_loc raise KeyError(key) from err KeyError: ‘AE_Arrive_Date’

I have no idea what might cause this, so I’m grateful for any help!

Hi! Thanks for reporting this. Do you have a small sample dataset you can share? Obviously 65+M rows would be too big to upload, but maybe a few hundred or thousand?

1 Like

Hi, unfortunately I do not have any datasets like that. If it is of any interest, the one that I am using is a free dataset that can be found compressed here: Hospital Discharge Data Use Agreement

Hi,

Sorry for the delay replying! I downloaded the 2015 tab-separated dataset from that site, but it doesn’t have the AE_Arrival_Date column. Can you please be more specific about which dataset you’re using?

Thanks!

1 Like

Edit:

I’m very sorry, I misremembered the source of the data. The actual dataset used can be found here: A&E Synthetic Data - Datasets - NHS England Data Catalogue

Once again, I’m sorry for wasting your time.

1 Like

No problem, not a waste of time, I got to know about another new dataset :smile:

Thanks for sharing the dataset! I’m able to reproduce this.

The issue seems to be that Dask isn’t able to identify the header for this dataset, even with header=0. We can tell because df.tail displays an odd header using your example. This is also why you’re getting a KeyErorr during the apply – the AE_Arrive_Date column (header) isn’t available in some partitions. I’m not sure why, but I’ll keep looking into this. Here’s my notebook if you’re interested: dask-read-csv-keyerror.ipynb · GitHub

Explicitly specifying the columns in read_csv fixed it:

import dask.dataframe as dd

cols = ['IMD_Decile_From_LSOA',
        'Age_Band',
        'Sex',
        'AE_Arrive_Date',
        'AE_Arrive_HourOfDay',
        'AE_Time_Mins',
        'AE_HRG',
        'AE_Num_Diagnoses',
        'AE_Num_Investigations',
        'AE_Num_Treatments',
        'AE_Arrival_Mode',
        'Provider_Patient_Distance_Miles',
        'ProvID',
        'Admitted_Flag',
        'Admission_Method',
        'ICD10_Chapter_Code',
        'Treatment_Function_Code',
        'Length_Of_Stay_Days']

ddf = dd.read_csv("data/AESyntheticData.csv", names=cols)

This annoying error means that Pandas can not find your column name in your dataframe. Before doing anything with the data frame, use print(df.columns) to see dataframe column exist or not.

print(df.columns)

I was getting a similar kind of error in one of my codes. Turns out, that particular index was missing from my data frame as I had dropped the empty dataframe 2 rows. If this is the case, you can do df.reset_index(inplace=True) and the error should be resolved.

The KeyError could be pretty misleading.

If this helps someone - triple check that you do not have duplicates in your columns. It will lead to a KeyError which will not even be related to the column that you have as a duplicate.