Distributed insertion to SQL database

Hi, I have a routine where I extract a lot of features from large images and store these features on an SQLite database.

I’m trying to implement a version with dask, it’s able to process the data but it always crashes before inserting the data into the database. Note that, the database it’s created but empty.

My program is like this:

from toolz import curry
import dask
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd

...

@dask.delayed(nout=1, pure=True)
@curry
def _process(...) -> pd.DataFrame:
    ...

def main(...) -> None:
    ...
    worker_directory = f'/tmp/{uuid.uuid4().hex}'
    cluster = LocalCluster(
        n_workers=n_workers,
        threads_per_worker=1,
        processes=True,
        local_directory=worker_directory,
    )
    client = Client(cluster)

    process = _process(<parameters>)

    df = dd.from_delayed([process(t) for t in range(nb_time_pts)])

    df.to_sql('table', db_path, parallel=True, method='multi')

    client.close()

Ideally, I would like to simplify the task graph so _process and the SQL insert are a single task, but the default case should be working first.

Thanks in advance,

@JoOkuma Welcome, and thanks for the question!

I’m not able to reproduce this with some toy data, would you be able to share a minimal example?

Here’s what I ran:

from toolz import curry
from dask.distributed import Client, LocalCluster
from dask.utils import tmpfile
from sqlalchemy import create_engine

import dask
import pandas as pd
import dask.dataframe as dd


@dask.delayed(nout=1, pure=True)
@curry
def process():
    df = pd.DataFrame({
    'x': range(5),
    'y': range(5),
    })
    return df

client = Client(n_workers=2, threads_per_worker=1, processes=True)

df = dd.from_delayed([process() for t in range(3)])

with tmpfile() as f:
    db = 'sqlite:///%s' %f
    df.to_sql('test', db, parallel=True, method='multi')
    engine = create_engine(db, echo=False)
    result = engine.execute("SELECT * FROM test").fetchall()

result # Works

Thanks, @pavithraes. I’m still trying to find a minimal example, I’ve pinpointed that the problem is actually with pybind11 from one of the image processing libraries, it errors when freeing some memory when using multiprocessing.

1 Like