Although I know it’s expensive and not recommended, I still have to sort my data by column, ideally multiple columns. The data is bigger than available memory - trying slightly different things, I keep running into memory issues, mostly associated to P2P shuffle’s.
What are the best strategies for sorting data that is larger than available memory? And would you know an approach to mimick Pandas multi-column sort?
In a few lines, this what I’m trying to do:
shorelines = dask_geopandas.read_parquet"/data/src/shorelines/*.parquet")
# Ideally I would be able to sort the multiple columns, like you would do in Pandas as:
# shorelines = shorelines.sort_values(["minx", "miny", "maxx", "maxy"])
# But just by one columns would already be a good starting point:
# shorelines = shorelines.sort_values("minx")
# By index would also be fine
shorelines = shorelines.set_index("minx")
shorelines.to_parquet("/data/prc/shorelines")
These are the kind of error’s I’ll run into:
File "/u/calkoen/miniforge3/envs/daskenv/lib/python3.11/site-packages/distributed/core.py", line 1394, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/u/calkoen/miniforge3/envs/daskenv/lib/python3.11/site-packages/distributed/core.py", line 1153, in send_recv
response = await comm.read(deserializers=deserializers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/u/calkoen/miniforge3/envs/daskenv/lib/python3.11/site-packages/distributed/comm/tcp.py", line 237, in read
convert_stream_closed_error(self, e)
File "/u/calkoen/miniforge3/envs/daskenv/lib/python3.11/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:53008 remote=tcp://127.0.0.1:37089>: Stream is closed
2024-02-16 03:56:18,020 - distributed.worker - WARNING - Compute Failed
Key: ('shuffle-transfer-7120df48ccf2acb5c22b0388f75eda25', 11)
Function: shuffle_transfer
args: ( box_id time geometry feature_type source_file group size quadkey minx miny maxx maxy _partitions
0 BOX_093 1984-01-01 LINESTRING (8.70496 -1.35722, 8.70503 -1.35779... Feature BOX_093_016 0 4946 300001102233 8.702732 -1.362184 8.723711 -1.329801 170
1 BOX_093 1987-01-01 LINESTRING (8.73206 -1.35856, 8.73191 -1.35868... Feature BOX_093_016 1 2786 300001102233 8.702732 -1.358820 8.732062 -1.340866 170
2 BOX_093 2015-01-01 LINESTRING (8.84965 -0.84178, 8.84958 -0.84167... Feature BOX_093_016 19 20882 30000110 8.702732 -0.841784 8.849646 -0.618319 170
3 BOX_093 2017-01-01 LINESTRING (8.84965 -0.84178, 8.84958 -0.84167... Feature BOX_093_016 21 21362 30000110 8.702732 -0.841784 8.849646 -0.617510 170
4 BOX_093 2017-01-01 LINESTRING (8.70273 -0.67919, 8.
kwargs: {}
Exception: "RuntimeError('P2P shuffling 7120df48ccf2acb5c22b0388f75eda25 failed during transfer phase')"