Sorting data that is larger than available memory

Although I know it’s expensive and not recommended, I still have to sort my data by column, ideally multiple columns. The data is bigger than available memory - trying slightly different things, I keep running into memory issues, mostly associated to P2P shuffle’s.

What are the best strategies for sorting data that is larger than available memory? And would you know an approach to mimick Pandas multi-column sort?

In a few lines, this what I’m trying to do:

    shorelines = dask_geopandas.read_parquet"/data/src/shorelines/*.parquet")

    # Ideally I would be able to sort the multiple columns, like you would do in Pandas as: 
    # shorelines = shorelines.sort_values(["minx", "miny", "maxx", "maxy"])

    # But just by one columns would already be a good starting point: 
    # shorelines = shorelines.sort_values("minx")
    
    # By index would also be fine
    shorelines = shorelines.set_index("minx")
    shorelines.to_parquet("/data/prc/shorelines")

These are the kind of error’s I’ll run into:

  File "/u/calkoen/miniforge3/envs/daskenv/lib/python3.11/site-packages/distributed/core.py", line 1394, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/u/calkoen/miniforge3/envs/daskenv/lib/python3.11/site-packages/distributed/core.py", line 1153, in send_recv
    response = await comm.read(deserializers=deserializers)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/u/calkoen/miniforge3/envs/daskenv/lib/python3.11/site-packages/distributed/comm/tcp.py", line 237, in read
    convert_stream_closed_error(self, e)
  File "/u/calkoen/miniforge3/envs/daskenv/lib/python3.11/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:53008 remote=tcp://127.0.0.1:37089>: Stream is closed
2024-02-16 03:56:18,020 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-7120df48ccf2acb5c22b0388f75eda25', 11)
Function:  shuffle_transfer
args:      (        box_id       time                                           geometry feature_type  source_file  group   size       quadkey       minx       miny       maxx       maxy  _partitions
0      BOX_093 1984-01-01  LINESTRING (8.70496 -1.35722, 8.70503 -1.35779...      Feature  BOX_093_016      0   4946  300001102233   8.702732  -1.362184   8.723711  -1.329801          170
1      BOX_093 1987-01-01  LINESTRING (8.73206 -1.35856, 8.73191 -1.35868...      Feature  BOX_093_016      1   2786  300001102233   8.702732  -1.358820   8.732062  -1.340866          170
2      BOX_093 2015-01-01  LINESTRING (8.84965 -0.84178, 8.84958 -0.84167...      Feature  BOX_093_016     19  20882      30000110   8.702732  -0.841784   8.849646  -0.618319          170
3      BOX_093 2017-01-01  LINESTRING (8.84965 -0.84178, 8.84958 -0.84167...      Feature  BOX_093_016     21  21362      30000110   8.702732  -0.841784   8.849646  -0.617510          170
4      BOX_093 2017-01-01  LINESTRING (8.70273 -0.67919, 8.
kwargs:    {}
Exception: "RuntimeError('P2P shuffling 7120df48ccf2acb5c22b0388f75eda25 failed during transfer phase')"

Hi @sirolf, welcome to Dask Discourse forum!

Yes, sorting is expensive, but Dask should be able to help you sortingn dataset bigger than memory.

What is your setup: Are you using a LocalCluster? How many Workers, threads, memory ? How big is your input dataset?

Did you check the dashboard to understand more where the problem could be?

Hi @guillaumeeb, TY for your help! That’s what my experience was as well - Dask should be able to do sorts, even though it’s expensive.

I’m understanding the problem a bit better now. I typically work with geo data, so my dataframe’s include geometry datatypes. This is always a bit trial-and-error between dask_geopandas, dask and different encodings (geometries, wkb, wkt) to make lazy computations work.

Although I always try to use recent releases, my pyarrow version was pinned < 12 for a while to avoid some other issue. Now a few days ago I removed that constraint - so now I’m on pyarrow 14. I think this upgrade (or a recent Dask version) caused encodings to work differently between Dask and GeoParquet. In effect I ran into an error that is shown as "… P2P shuffle … ", but I think underneath it’s related to geometry encoding. I’ll try to talk you over my use case:

I have a workflow where I’m creating geoparquet files from geopandas like this:

for fp in fps: 
    gdf = process(fp)
    gdf.to_parquet( OUT_DIR / f"part.{i}.parquet")

So now I have a directory that contains a bunch of geoparquet files. Now, with the versions below, I note the following behaviour:

dask                      2023.12.0          pyhd8ed1ab_0    conda-forge
dask-core                 2023.12.0          pyhd8ed1ab_0    conda-forge
dask-expr                 0.2.8              pyhd8ed1ab_0    conda-forge
dask-geopandas            0.3.1              pyhd8ed1ab_1    conda-forge
dask-glm                  0.3.2              pyhd8ed1ab_0    conda-forge
dask-labextension         7.0.0              pyhd8ed1ab_0    conda-forge
dask-ml                   2023.3.24          pyhd8ed1ab_1    conda-forge
...
pyarrow                   14.0.1          py311hd7bc329_10_cpu    conda-forge
pyarrow-hotfix            0.6                pyhd8ed1ab_0    conda-forge
import pathlib
import dask_geopandas
import dask.dataframe as dd
import geopandas as gpd
import pandas as pd

data_dir = pathlib.Path("")
gddf = dask_geopandas.read_parquet(data_dir / "*.parquet").compute()  # as expected
ddf = dd.read_parquet(data_dir / "*.parquet").compute()) # fails with byte error (below)
gdf = gpd.read_parquet(data_dir / "part.0.parquet") # as expected
df = pd.read_parquet(data_dir / "part.0.parquet") # works, with geometry col as wkb
 
In [14]: df2 = dd.read_parquet("/data/with/part.349.parquet").compute()
---------------------------------------------------------------------------
UnicodeDecodeError                        Traceback (most recent call last)
Cell In[14], line 1
----> 1 df2 = dd.read_parquet("/data/with/part.349.parquet").compute()

File ~/miniforge3/envs/daskenv/lib/python3.11/site-packages/dask/base.py:342, in DaskMethodsMixin.compute(self, **kwargs)
    318 def compute(self, **kwargs):
    319     """Compute this dask collection
    320
    321     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    340     dask.compute
    341     """
--> 342     (result,) = compute(self, traverse=False, **kwargs)
    343     return result

File ~/miniforge3/envs/daskenv/lib/python3.11/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniforge3/envs/daskenv/lib/python3.11/site-packages/dask/dataframe/_pyarrow.py:82, in _to_string_dtype(df, dtype_check, index_check, string_dtype)
     79     dtypes = string_dtype
     81 if dtypes:
---> 82     df = df.astype(dtypes, copy=False)
     84 # Convert DataFrame/Series index too
     85 if (is_dataframe_like(df) or is_series_like(df)) and index_check(df.index):

File lib.pyx:747, in pandas._libs.lib.ensure_string_array()

File lib.pyx:816, in pandas._libs.lib.ensure_string_array()

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x90 in position 5: invalid start byte

With these versions I would not run into the byte encoding error when doing dd.read_parquet("*")

dask                      2023.10.1          pyhd8ed1ab_0    conda-forge
dask-cloudprovider        2022.10.0          pyhd8ed1ab_0    conda-forge
dask-core                 2023.10.1          pyhd8ed1ab_0    conda-forge
dask-gateway              2023.9.0           pyh8af1aa0_0    conda-forge
dask-geopandas            0.3.1              pyhd8ed1ab_1    conda-forge
dask-glm                  0.2.0                      py_1    conda-forge
dask-image                2023.8.1           pyhd8ed1ab_0    conda-forge
dask-labextension         7.0.0              pyhd8ed1ab_0    conda-forge
dask-ml                   2023.3.24          pyhd8ed1ab_1    conda-forge
pyarrow                   10.0.1          py311h1e679ab_50_cpu    conda-forge

Any idea what might cause this?

Maybe it is related to the default PyArrow string support and usage now in Dask.

Does it changes something if you set:

dask.config.set({"dataframe.convert-string": False})

I’ll check that tomorrow because my data is atm no longer in geometry dtype. My primary goal is to do the sorting - so I converted the geometries to hex strings by df.geometry.to_wkb(hex=True). Now the bytes are no longer an issue, but…

When I’m doing the simplest way of sorting the process get’s killed after a while:



    import pathlib
    import dask.dataframe as dd

    DATA_DIR = pathlib.Path("~/data/"))
    OUT_DIR = DATA_DIR.parent / (DATA_DIR.stem + "_sorted")
    if not OUT_DIR.exists():
        OUT_DIR.mkdir(parents=True, exist_ok=True)

    shorelines = dd.read_parquet(DATA_DIR / "*.parquet")
    shorelines = shorelines.set_index("quadkey")
    shorelines.to_parquet(OUT_DIR)

Some extra info about the data:
# Example for 1 partition:
# In [2]: import dask.dataframe as dd
# In [3]: df = dd.read_parquet(“part-0.parquet”)
# In [4]: df
# Out[4]:
# Dask DataFrame Structure:
# box_id time geometry source_file group size quadkey minx miny maxx maxy
# npartitions=1
# string datetime64[ns] string string int32 int32 string float64 float64 float64 float64
# … … … … … … … … … … …
# Dask Name: read-parquet, 1 graph layer

In [5]: df.dtypes
Out[5]:
box_id string[pyarrow]
time datetime64[ns]
geometry string[pyarrow]
source_file string[pyarrow]
group int32
size int32
quadkey string[pyarrow]
minx float64
miny float64
maxx float64
maxy float64
dtype: object

Do you know how I might fix my sort? Or what would you recommend to check?

Do you know why the process is getting killed? Do you get any Errors?

I would recommend to monitor the execution using a LocalCluster and its Dashboard. But this might not be sufficient.

Yes, sorry, just providing a killed output isn’t very helpful… I’m running this on a remote cluster that I’m not very familiar with. So I’m not very keen setting up monitor forwarding etc, although I can maybe save the worker logs somewhere.

I guess it’s related to the geometries, which vary substantially in size (shorelines). Moreover, the sort by quadkey will result in very unequal partition sizes. Like, areas in Norway will be very large, wheras areas with straight coastlines (West Coast Africa) will be relatively small. Do you think that could be a problem for Dask?

I’ll add more : can you try on a subset of the dataset in order to make it all working, or on a LocalCluster before going on a remote one you’re not familiar with?

Well, it might, especially if several geometries share the same quadkey, I’m not sure if Dask will be able to make several partitions of them.