Hi, I have a routine where I extract a lot of features from large images and store these features on an SQLite database.
I’m trying to implement a version with dask, it’s able to process the data but it always crashes before inserting the data into the database. Note that, the database it’s created but empty.
My program is like this:
from toolz import curry
import dask
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
...
@dask.delayed(nout=1, pure=True)
@curry
def _process(...) -> pd.DataFrame:
...
def main(...) -> None:
...
worker_directory = f'/tmp/{uuid.uuid4().hex}'
cluster = LocalCluster(
n_workers=n_workers,
threads_per_worker=1,
processes=True,
local_directory=worker_directory,
)
client = Client(cluster)
process = _process(<parameters>)
df = dd.from_delayed([process(t) for t in range(nb_time_pts)])
df.to_sql('table', db_path, parallel=True, method='multi')
client.close()
Ideally, I would like to simplify the task graph so _process
and the SQL insert are a single task, but the default case should be working first.
Thanks in advance,