Uploading DD to BigQuery table using Streaming BQ API (bigquery.client)

Hi everyone! I’m trying to figure out if I am on the right road or not and if I’m missing or ignoring something.

I’m trying to upload a dask dataframe from pulled data from Postgres database to a bigquery table.

My goal is to parallelize the pulled down from Postgres, distribute data across a cluster (I was thinking about using CUDA Cluster of GPUs), cast, clean, wrangle, and then upload to bigquery.

I’ll list the steps of my approach, then provide some code and the error that returns:

  1. Pull data from PostgreSQL using dd.read_sql_table() providing full sql.alchemy uri and meta argument to directly infer the schema (for meta argument I provide an empty dataframe with all columns as objects)

  2. Cast columns to floats, strings, datetime and boolean

  3. Upload to BigQuery table inferring the schema with bigquery.LoadJobConfig(autodetect = True) and load_table_from_dataframe().

I’m aware that somewhere in my pipeline, when the data is being uploaded it must be transformed into Arrow data types, so I cast strings as UInt64 (I don’t know if that’s right or not).

This is my code:

extract = ExtractPostgreSQL(username= username, password= password, host=host, port= port, database=database, dialect=dialect, driver=driver, table=table_name)

df_to_cast = extract.get_meta_rawdf()

df_to_cast.drop(labels='id', axis=1, inplace=True)

df = dd.read_sql_table(table_name= table_name, index_col='id', con=f'{dialect}+{driver}://{username}:{password}@{host}:{port}/{database}', meta=df_to_cast)

df = df.fillna(0)

df = df.reset_index()

df.columns = df.columns.str.replace('index', 'id')

extract.cast_columns_to_floats(df_raw= df)

extract.cast_columns_to_strings(df_raw= df)

extract.cast_columns_to_datetime(df_raw= df)

extract.cast_columns_to_bool(df_raw= df)

credentials = service_account.Credentials.from_service_account_file(PATHS.GCP_KEYS)

client = bigquery.Client(project='datalakedesarrollo',credentials=credentials)

job_config = bigquery.LoadJobConfig(autodetect= True)

client.load_table_from_dataframe(dataframe=df, destination='datalakedesarrollo.dwh_testing_1.test6', job_config = job_config)

This is the error:

pyarrow.lib.ArrowInvalid: Could not convert Dask Series Structure:
npartitions=1
    string
       ...
Name: name, dtype: string
Dask Name: try_loc, 124 graph layers with type Series: did not recognize Python value type when inferring an Arrow data type

Some methods from ExtractPostgreSQL class:

    def cast_columns_to_floats(self, df_raw: pd.DataFrame) -> None:

        list_of_numeric_cols = list(self.df_dtypes.loc[self.df_dtypes['data_type'].isin(['integer','bigint', 'uuid']), 'column_name'].values)

        for col in list_of_numeric_cols:
            df_raw[col] = df_raw[col].astype(float)

    def cast_columns_to_strings(self, df_raw: pd.DataFrame) -> None:

        list_of_string_columns = list(self.df_dtypes.loc[self.df_dtypes['data_type'].isin(['text','character varying', 'jsonb']), 'column_name'].values)

        for col in list_of_string_columns:
            df_raw[col] = df_raw[col].astype('string')
    
    def cast_columns_to_bool(self, df_raw: pd.DataFrame) -> None:

        list_of_bool_cols = list(self.df_dtypes.loc[self.df_dtypes['data_type'].isin(['boolean']), 'column_name'].values)

        for col in list_of_bool_cols:
            df_raw[col] = df_raw[col].astype(bool)

I came up with a possible alternative solution using map_partitions() and insert_rows_from_dataframe()

try:
    df.map_partitions(lambda d: client.insert_rows_from_dataframe(table=table, dataframe=d, selected_fields=schema), meta=df_to_cast).compute()
    #gc.collect()
except StopIteration:
    pass

This approach worked, but when I try to manage big tables (40/50GBs) it crashed and I ran out of memory. I was thinking to define a wrap-up function and upload the big tables by partitions(1 pandas dataframe at a time) with map_partitions but I don’t know if it’s the best approach in terms of performance or speed.

I’m all ears and thanks in advance.

Hi @cremerf, welcome to Dask community!

You should be able to read the data with correct type using appropriate metadata I think, then avoiding to cast things after.

I’m not really familiar with Arrow, but this sounds strange to me. Why aren’t you using the Arrow STRING type?

Why are you doing this operation?

Could you give the complete stack trace?

Question is, can bigquery.Client handle a Dask DataFrame? If not, you’ll probably have to go with map_partitions, but there might be some other problem with BigQuery and writing concurrently in the same table.

Hi @guillaumeeb, thank you for your reply.

You should be able to read the data with correct type using appropriate metadata I think, then avoiding to cast things after.

Thank you, now I cast the ‘meta_df’ with the desired dtypes and all goes right.

I’m not really familiar with Arrow, but this sounds strange to me. Why aren’t you using the Arrow STRING type?

I’ve tried to cast string columns as string arrow columns but when I compute dask interprets those columns as ‘objects’.

Why are you doing this operation?

When I pulled down from postgres I need to provide an index column, I don’t know why but when doing this I also have to provide the ‘meta_df’ without the index column I declared before. I reset the index and rename then index for id.

Question is, can bigquery.Client handle a Dask DataFrame? If not, you’ll probably have to go with map_partitions , but there might be some other problem with BigQuery and writing concurrently in the same table.

fortunately, I could upload my dask dataframe to bigquery using map_partitions and load_table_from_dataframe, loading 1 partition at a time, and to avoid crashing the memory I used a LocalCUDA Cluster but the process was to slow, so I shifted to LocalCluster() but now I had this issue with my memory:

This is how I code it:

from dask.distributed import Client, LocalCluster
from dask_cuda import LocalCUDACluster
from test import main

def main_run():

    main()

if __name__ == '__main__':
    from multiprocessing import freeze_support
    freeze_support()

    cluster = LocalCluster()
    client = Client(cluster)
    print(client.dashboard_link)
    main_run()
> 2023-04-05 10:42:31,006 - distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.95 GiB -- Worker memory limit: 3.10 GiB
> 2023-04-05 10:42:31,048 - distributed.worker_memory - WARNING - Worker tcp://127.0.0.1:46501 (pid=3123) exceeded 95% memory budget. Restarting...
> 2023-04-05 10:42:31,132 - distributed.nanny - WARNING - Restarting worker

After spamming this warning, the cluster shut down with this error:

distributed.scheduler.KilledWorker: Attempted to run task ('read_sql_chunk-lambda-be63948399b13fb223856070d9f123ac', 18) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:37355. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
2023-04-05 10:44:27,810 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing
2023-04-05 10:44:27,810 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-04-05 10:44:27,810 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-04-05 10:44:27,811 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing

How can I cap the memory usage of my cluster? Or avoid killing the worker because of the memory being crashed?

The memory usage for your cluster is already capped, hence the error. But I know this is not what you meant.

Somehow, your workflow is keeping too much data into worker’s memory, which is weird because it looks embarrassingly parallel. Normally, each time a worker insert a partition into BigQuery, the memory occupied by this chunk should be freed.

Is the code on your first post still valid? Did you looked at the Dashboard to have some insights of what was happ?

Did you try writing the results in another output, parquet file for example?

I added a chunk division by megabytes to avoid crashing the memory but it still throwing that error.

def upload_dataframe_to_bq(dataframe, table_id, job_config):
    
    job = client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.done()
    trim_memory()

def split_dataframe_by_size(df, chunk_size_mb=512):
    
    total_size = df.memory_usage(deep=True).sum()
    
    # Convert chunk size from MB to bytes
    chunk_size = chunk_size_mb * 1024 * 1024
    
    num_chunks = int(total_size / chunk_size) + 1

    df_to_iterate = np.array_split(df, num_chunks)

    return df_to_iterate

def upload_from_dask(df_partitioned):    

    df_iterator = split_dataframe_by_size(df_partitioned)
    print(f'Dataframe partitioned in {len(df_iterator)} of 512MiB each one')

    with ThreadPoolExecutor() as executor:
        list(executor.map(lambda chunk: upload_dataframe_to_bq(chunk, table_id, job_config), df_iterator))

def run():

    print('PIPELINE STARTED...')
    t1_start = perf_counter()

    extract = ExtractPostgreSQL(username= username, password= password, host=host, port= port, database=database, dialect=dialect, driver=driver, table=table_name)

    df_to_cast = extract.get_meta_rawdf()

    extract.cast_columns_to_floats(df_raw= df_to_cast)
    extract.cast_columns_to_strings(df_raw= df_to_cast)
    extract.cast_columns_to_bool(df_raw= df_to_cast)
    extract.cast_columns_to_datetime(df_raw= df_to_cast)

    df_to_cast.drop(labels='id', axis=1, inplace=True)

    print('Cast ready')

    df = dd.read_sql_table(table_name= table_name, index_col='id', con=f'{dialect}+{driver}://{username}:{password}@{host}:{port}/{database}', meta=df_to_cast, bytes_per_chunk='500 MiB')
    df = df.reset_index()
    df.columns = df.columns.str.replace('index', 'id')
    df['id'] = df['id'].astype(int)

    print('Pulled down ready')

    print('Uploading started')

    try:
        df.map_partitions(lambda d: upload_from_dask(df_partitioned=d), meta=df_to_cast).compute()
    except StopIteration:
        pass

    t1_finish = perf_counter()
    print('PIPELINE FINISHED.')

    print(f'Pipeline finished in {round(t1_finish-t1_start,2)}')

def main():

    run()

if __name__ == '__main__':
    main()

This is my dashboard when this error appears:

2023-04-10 14:53:03,544 - distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see Worker Memory Management — Dask.distributed 2023.11.0+22.gdc06ce4 documentation for more information. – Unmanaged memory: 2.56 GiB – Worker memory limit: 3.10 GiB

This is the task graph, after some time it crashes and nothing gets processed:

The tricky thing here is that when I build up a CUDACluster with 1 worker, is does not crash and everything works, but it takes too long (takes 1 hour to process 1 table of 10GBs).

When I try to shift to LocalCluster with 3 threads per worker and 4 workers, it crashes!

From an external point of view, the code in upload_from_dask and split_dataframe_by_size is useless. I’m not sure of what it prints, but you already specifies 500MiB per chunk in the Dask Dataframe. So there is no use to try to split them again later on, and it’s not a good idea to use a ThreadPool inside of Dask. Just use Dask! You shouldn’t have to do this.

Then 500MiB chunk is quite big already, especially when using Worker processes with only 3GiB of memory. I’m guessing that LocalCudaCluster is working just because it probably run only one process and probably not a lot of threads with a lot more of memory for this one process.

In the LocalCluster case, each worker is having 3GiB of memory shared by 3 threads. One chunk of data per thread already takes half the memory of the worker. If the memory is not released right after the chunk is inserted into BigQuery, then reading another chunk per thread will just overwhelm the worker.

So I will recommend to:

  • Only relying on Dask for chunking and parallelizing things.
  • Use smaller chunks. a default of 128MiB is often a good start.
  • If necessary, use less workers and more threads, or try to tune this parameters.

You should ensure that at some point memory is released, and does not stay as unmanaged memory for ever.