Error while decompressing: invalid input

Hi all! I’m trying to load 2 csv files into dataframes and perform left joins using dd.merge and save the result back into a csv file. However, I encounter a Error while decompressing: invalid input error. I am not able to identify the source of the issue. Loading the 2 csv files separately and saving them back as csv seems to work.

Appreciate any help on this!

Stacktrace:

Traceback (most recent call last):
  File "/work/scdatahub/Data_Cleaning/Python_Function/geocode - apparel.py", line 102, in <module>
    merged.to_csv('/work/scdatahub/Data_Cleaning/Output_Data/apparel_geocoded.csv')
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/dataframe/core.py", line 1560, in to_csv
    return to_csv(self, filename, **kwargs)
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/dataframe/io/csv.py", line 957, in to_csv
    return list(dask.compute(*values, **compute_kwargs))
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/base.py", line 573, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/threaded.py", line 81, in get
    results = get_async(
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/local.py", line 506, in get_async
    raise_exception(exc, tb)
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/local.py", line 314, in reraise
    raise exc
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/local.py", line 219, in execute_task
    result = _execute_task(task, data)
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/dataframe/shuffle.py", line 815, in collect
    res = p.get(part)
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/core.py", line 73, in get
    return self.get([keys], **kwargs)[0]
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/core.py", line 79, in get
    return self._get(keys, **kwargs)
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/encode.py", line 29, in _get
    return [self.join([self.decode(frame) for frame in framesplit(chunk)])
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/encode.py", line 29, in <listcomp>
    return [self.join([self.decode(frame) for frame in framesplit(chunk)])
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/encode.py", line 29, in <listcomp>
    return [self.join([self.decode(frame) for frame in framesplit(chunk)])
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/pandas.py", line 201, in deserialize
    blocks = [block_from_header_bytes(h, b)
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/pandas.py", line 201, in <listcomp>
    blocks = [block_from_header_bytes(h, b)
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/pandas.py", line 162, in block_from_header_bytes
    values = pnp.deserialize(pnp.decompress(bytes, dtype), dtype,
  File "/shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/numpy.py", line 159, in decompress
    return decompress_text(bytes)
  File "/home/krishnamurthy.sur/.local/lib/python3.9/site-packages/snappy/snappy.py", line 92, in uncompress
    return _uncompress(data)
snappy.UncompressError: Error while decompressing: invalid input

Code:

df = dd.read_csv('/work/scdatahub/Data_Cleaning/Output_Data/importyeti_apparel_indexed.csv/*',
                dtype=dtype, encoding="utf-8")

addr = dd.read_csv('/work/scdatahub/Data_Cleaning/Input_Data/address.csv', encoding="utf-8")

... 


merged = dd.merge(df, addr.add_prefix('con_'), 
                  how='left', 
                  left_on='prepped_consignee_addr', right_on='con_low_addr')

merged = dd.merge(merged, addr.add_prefix('ship_'), 
                  how='left', 
                  left_on='prepped_shipper_addr', right_on='ship_low_addr')
merged = merged.astype('string')
merged.to_csv('/work/scdatahub/Data_Cleaning/Output_Data/apparel_geocoded.csv')

Hi @suryak, welcome to Dask community!

First, are you using Dask Distributed? What kind of Scheduler/Cluster Manager are you using?

Merge implies shuffling the data between workers, and it seems Workers or Scheduler aren’t speeking the smae language here when exanging data…

It’s a little hard to tell, but I did notice something: you seem to have some libraries installed on your local environment:

Maybe this local environment is not compatible with the main one in

You should try with a simple threaded Scheduler to see if it fixes the problem, and then probably check your environment.

That’s a good catch! I’m using a slurm manager with my university’s HPC. I submit my job with the sbatch command. I will give my environment a closer look. Thank you!

I fixed the libraries so they’re all being used from the same environment but the error persists:

---------------------------------------------------------------------------
UncompressError                           Traceback (most recent call last)
File <timed eval>:1, in <module>

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/dataframe/core.py:1560, in _Frame.to_csv(self, filename, **kwargs)
   1557 """See dd.to_csv docstring for more information"""
   1558 from .io import to_csv
-> 1560 return to_csv(self, filename, **kwargs)

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/dataframe/io/csv.py:957, in to_csv(df, filename, single_file, encoding, mode, name_function, compression, compute, scheduler, storage_options, header_first_partition_only, compute_kwargs, **kwargs)
    953         compute_kwargs["scheduler"] = scheduler
    955     import dask
--> 957     return list(dask.compute(*values, **compute_kwargs))
    958 else:
    959     return values

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/base.py:573, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    570     keys.append(x.__dask_keys__())
    571     postcomputes.append(x.__dask_postcompute__())
--> 573 results = schedule(dsk, keys, **kwargs)
    574 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/threaded.py:81, in get(dsk, result, cache, num_workers, pool, **kwargs)
     78     elif isinstance(pool, multiprocessing.pool.Pool):
     79         pool = MultiprocessingPoolExecutor(pool)
---> 81 results = get_async(
     82     pool.submit,
     83     pool._max_workers,
     84     dsk,
     85     result,
     86     cache=cache,
     87     get_id=_thread_get_id,
     88     pack_exception=pack_exception,
     89     **kwargs,
     90 )
     92 # Cleanup pools associated to dead threads
     93 with pools_lock:

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/local.py:506, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    504         _execute_task(task, data)  # Re-execute locally
    505     else:
--> 506         raise_exception(exc, tb)
    507 res, worker_id = loads(res_info)
    508 state["cache"][key] = res

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/local.py:314, in reraise(exc, tb)
    312 if exc.__traceback__ is not tb:
    313     raise exc.with_traceback(tb)
--> 314 raise exc

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/local.py:219, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    217 try:
    218     task, data = loads(task_info)
--> 219     result = _execute_task(task, data)
    220     id = get_id()
    221     result = dumps((result, id))

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/dask/dataframe/shuffle.py:815, in collect(p, part, meta, barrier_token)
    813 """Collect partitions from partd, yield dataframes"""
    814 with ensure_cleanup_on_exception(p):
--> 815     res = p.get(part)
    816     return res if len(res) > 0 else meta

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/core.py:73, in Interface.get(self, keys, **kwargs)
     71 def get(self, keys, **kwargs):
     72     if not isinstance(keys, list):
---> 73         return self.get([keys], **kwargs)[0]
     74     elif any(isinstance(key, list) for key in keys):  # nested case
     75         flatkeys = list(flatten(keys))

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/core.py:79, in Interface.get(self, keys, **kwargs)
     77     return nested_get(keys, dict(zip(flatkeys, result)))
     78 else:
---> 79     return self._get(keys, **kwargs)

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/encode.py:29, in Encode._get(self, keys, **kwargs)
     27 def _get(self, keys, **kwargs):
     28     raw = self.partd._get(keys, **kwargs)
---> 29     return [self.join([self.decode(frame) for frame in framesplit(chunk)])
     30             for chunk in raw]

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/encode.py:29, in <listcomp>(.0)
     27 def _get(self, keys, **kwargs):
     28     raw = self.partd._get(keys, **kwargs)
---> 29     return [self.join([self.decode(frame) for frame in framesplit(chunk)])
     30             for chunk in raw]

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/encode.py:29, in <listcomp>(.0)
     27 def _get(self, keys, **kwargs):
     28     raw = self.partd._get(keys, **kwargs)
---> 29     return [self.join([self.decode(frame) for frame in framesplit(chunk)])
     30             for chunk in raw]

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/pandas.py:201, in deserialize(bytes)
    198 bytes = frames[1:]
    199 axes = [index_from_header_bytes(headers[0], bytes[0]),
    200         index_from_header_bytes(headers[1], bytes[1])]
--> 201 blocks = [block_from_header_bytes(h, b)
    202           for (h, b) in zip(headers[2:], bytes[2:])]
    203 return pd.DataFrame(create_block_manager_from_blocks(blocks, axes))

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/pandas.py:201, in <listcomp>(.0)
    198 bytes = frames[1:]
    199 axes = [index_from_header_bytes(headers[0], bytes[0]),
    200         index_from_header_bytes(headers[1], bytes[1])]
--> 201 blocks = [block_from_header_bytes(h, b)
    202           for (h, b) in zip(headers[2:], bytes[2:])]
    203 return pd.DataFrame(create_block_manager_from_blocks(blocks, axes))

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/pandas.py:162, in block_from_header_bytes(header, bytes)
    160     values = pickle.loads(bytes)
    161 else:
--> 162     values = pnp.deserialize(pnp.decompress(bytes, dtype), dtype,
    163                              copy=True).reshape(shape)
    164 if extension_type == 'categorical_type':
    165     values = pd.Categorical.from_codes(values,
    166                                        extension_values[1],
    167                                        ordered=extension_values[0])

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/partd/numpy.py:159, in decompress(bytes, dtype)
    157 def decompress(bytes, dtype):
    158     if dtype == 'O':
--> 159         return decompress_text(bytes)
    160     else:
    161         return decompress_bytes(bytes)

File /shared/centos7/anaconda3/2022.05/lib/python3.9/site-packages/snappy/snappy.py:92, in uncompress(data, decoding)
     90 if decoding:
     91     return _uncompress(data).decode(decoding)
---> 92 return _uncompress(data)

UncompressError: Error while decompressing: invalid input

I also set the config like so:

dask.config.set(scheduler='threads')

Well, this will be a little hard to investigate without a Minimum Reproducible Example. Do you think you could build one with part of your data, or fake ones? Ideally, an example using a LocalCluster so everyone can reproduce it on its laptop.