Hi everyone! I’m trying to figure out if I am on the right road or not and if I’m missing or ignoring something.
I’m trying to upload a dask dataframe from pulled data from Postgres database to a bigquery table.
My goal is to parallelize the pulled down from Postgres, distribute data across a cluster (I was thinking about using CUDA Cluster of GPUs), cast, clean, wrangle, and then upload to bigquery.
I’ll list the steps of my approach, then provide some code and the error that returns:
-
Pull data from PostgreSQL using dd.read_sql_table() providing full sql.alchemy uri and meta argument to directly infer the schema (for meta argument I provide an empty dataframe with all columns as objects)
-
Cast columns to floats, strings, datetime and boolean
-
Upload to BigQuery table inferring the schema with bigquery.LoadJobConfig(autodetect = True) and load_table_from_dataframe().
I’m aware that somewhere in my pipeline, when the data is being uploaded it must be transformed into Arrow data types, so I cast strings as UInt64 (I don’t know if that’s right or not).
This is my code:
extract = ExtractPostgreSQL(username= username, password= password, host=host, port= port, database=database, dialect=dialect, driver=driver, table=table_name)
df_to_cast = extract.get_meta_rawdf()
df_to_cast.drop(labels='id', axis=1, inplace=True)
df = dd.read_sql_table(table_name= table_name, index_col='id', con=f'{dialect}+{driver}://{username}:{password}@{host}:{port}/{database}', meta=df_to_cast)
df = df.fillna(0)
df = df.reset_index()
df.columns = df.columns.str.replace('index', 'id')
extract.cast_columns_to_floats(df_raw= df)
extract.cast_columns_to_strings(df_raw= df)
extract.cast_columns_to_datetime(df_raw= df)
extract.cast_columns_to_bool(df_raw= df)
credentials = service_account.Credentials.from_service_account_file(PATHS.GCP_KEYS)
client = bigquery.Client(project='datalakedesarrollo',credentials=credentials)
job_config = bigquery.LoadJobConfig(autodetect= True)
client.load_table_from_dataframe(dataframe=df, destination='datalakedesarrollo.dwh_testing_1.test6', job_config = job_config)
This is the error:
pyarrow.lib.ArrowInvalid: Could not convert Dask Series Structure:
npartitions=1
string
...
Name: name, dtype: string
Dask Name: try_loc, 124 graph layers with type Series: did not recognize Python value type when inferring an Arrow data type
Some methods from ExtractPostgreSQL class:
def cast_columns_to_floats(self, df_raw: pd.DataFrame) -> None:
list_of_numeric_cols = list(self.df_dtypes.loc[self.df_dtypes['data_type'].isin(['integer','bigint', 'uuid']), 'column_name'].values)
for col in list_of_numeric_cols:
df_raw[col] = df_raw[col].astype(float)
def cast_columns_to_strings(self, df_raw: pd.DataFrame) -> None:
list_of_string_columns = list(self.df_dtypes.loc[self.df_dtypes['data_type'].isin(['text','character varying', 'jsonb']), 'column_name'].values)
for col in list_of_string_columns:
df_raw[col] = df_raw[col].astype('string')
def cast_columns_to_bool(self, df_raw: pd.DataFrame) -> None:
list_of_bool_cols = list(self.df_dtypes.loc[self.df_dtypes['data_type'].isin(['boolean']), 'column_name'].values)
for col in list_of_bool_cols:
df_raw[col] = df_raw[col].astype(bool)
I came up with a possible alternative solution using map_partitions() and insert_rows_from_dataframe()
try:
df.map_partitions(lambda d: client.insert_rows_from_dataframe(table=table, dataframe=d, selected_fields=schema), meta=df_to_cast).compute()
#gc.collect()
except StopIteration:
pass
This approach worked, but when I try to manage big tables (40/50GBs) it crashed and I ran out of memory. I was thinking to define a wrap-up function and upload the big tables by partitions(1 pandas dataframe at a time) with map_partitions but I don’t know if it’s the best approach in terms of performance or speed.
I’m all ears and thanks in advance.