Hi all! I’m trying to load 2 csv files into dataframes and perform left joins using dd.merge
and save the result back into a csv file. However, I encounter a Error while decompressing: invalid input
error. I am not able to identify the source of the issue. Loading the 2 csv files separately and saving them back as csv seems to work.
Appreciate any help on this!
Stacktrace:
Traceback (most recent call last):
File "/work/scdatahub/Data_Cleaning/Python_Function/geocode - apparel.py", line 102, in <module>
merged.to_csv('/work/scdatahub/Data_Cleaning/Output_Data/apparel_geocoded.csv')
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/dataframe/core.py", line 1560, in to_csv
return to_csv(self, filename, **kwargs)
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/dataframe/io/csv.py", line 957, in to_csv
return list(dask.compute(*values, **compute_kwargs))
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/base.py", line 573, in compute
results = schedule(dsk, keys, **kwargs)
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/threaded.py", line 81, in get
results = get_async(
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/local.py", line 506, in get_async
raise_exception(exc, tb)
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/local.py", line 314, in reraise
raise exc
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/local.py", line 219, in execute_task
result = _execute_task(task, data)
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/dataframe/shuffle.py", line 815, in collect
res = p.get(part)
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/core.py", line 73, in get
return self.get([keys], **kwargs)[0]
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/core.py", line 79, in get
return self._get(keys, **kwargs)
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/encode.py", line 29, in _get
return [self.join([self.decode(frame) for frame in framesplit(chunk)])
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/encode.py", line 29, in <listcomp>
return [self.join([self.decode(frame) for frame in framesplit(chunk)])
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/encode.py", line 29, in <listcomp>
return [self.join([self.decode(frame) for frame in framesplit(chunk)])
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/pandas.py", line 201, in deserialize
blocks = [block_from_header_bytes(h, b)
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/pandas.py", line 201, in <listcomp>
blocks = [block_from_header_bytes(h, b)
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/pandas.py", line 162, in block_from_header_bytes
values = pnp.deserialize(pnp.decompress(bytes, dtype), dtype,
File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/numpy.py", line 159, in decompress
return decompress_text(bytes)
File "/home/krishnamurthy.sur/.local/lib/python3.9/site-packages/snappy/snappy.py", line 92, in uncompress
return _uncompress(data)
snappy.UncompressError: Error while decompressing: invalid input
Code:
df = dd.read_csv('/work/scdatahub/Data_Cleaning/Output_Data/importyeti_apparel_indexed.csv/*',
dtype=dtype, encoding="utf-8")
addr = dd.read_csv('/work/scdatahub/Data_Cleaning/Input_Data/address.csv', encoding="utf-8")
...
merged = dd.merge(df, addr.add_prefix('con_'),
how='left',
left_on='prepped_consignee_addr', right_on='con_low_addr')
merged = dd.merge(merged, addr.add_prefix('ship_'),
how='left',
left_on='prepped_shipper_addr', right_on='ship_low_addr')
merged = merged.astype('string')
merged.to_csv('/work/scdatahub/Data_Cleaning/Output_Data/apparel_geocoded.csv')