Observation
I am currently experimenting with vertical scaling with dask from inside an airflow task. Using the WorkerPlugin seems to interfere with the orchestration framework module hacking magic for me:
AIRFLOW__CORE__DAGS_FOLDER=/path/to/code python -m airflow dags test minimal_dask_example
Exception: ModuleNotFoundError("No module named 'unusual_prefix_8a11fd3e5b7b8ac9fe035c8b5237a362e830f325_dag'")
Question
Is it possible & does it make sense to work around the airflow module import magic? Is there a better approach to execute this dask code other than starting a new interpreter process with subprocess.run()?
Related
Code
from datetime import datetime
import dask.dataframe as dd
import pandas as pd
from airflow import DAG
from airflow.decorators import task
from dask.distributed import Client, WorkerPlugin
class VariableSetupPlugin(WorkerPlugin):
def __init__(self, data):
self.data = data
def setup(self, worker):
# Set the variable on the worker
worker.my_shared_data = self.data
with DAG(
"minimal_dask_example",
schedule="@daily",
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
@task()
def dask_in_task():
my_data = {"key": "value"}
df = pd.DataFrame({"A": range(100), "B": range(100, 200), "C": range(200, 300)})
with Client(n_workers=2, processes=True) as client:
client.register_worker_plugin(VariableSetupPlugin(my_data))
# Convert to a Dask DataFrame
ddf = dd.from_pandas(df, npartitions=4)
ddf.mean().compute()
# Define the task dependencies
start_task = dask_in_task()
Traceback
[2024-10-17T21:14:24.473+0200] {dag.py:3090} ERROR - Task failed; ti=<TaskInstance: minimal_dask_example.dask_anyone manual__2024-10-17T19:14:22.165911+00:00 [failed]>
Traceback (most recent call last):
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/dag.py", line 3083, in test
_run_task(
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/dag.py", line 4400, in _run_task
ti._run_raw_task(session=session, raise_on_defer=inline_trigger, mark_success=mark_success)
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/utils/session.py", line 94, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3004, in _run_raw_task
return _run_raw_task(
^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
TaskInstance._execute_task_with_callbacks(
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3158, in _execute_task_with_callbacks
result = self._execute_task(context, task_orig)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3182, in _execute_task
return _execute_task(self, context, task_orig)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/decorators/base.py", line 266, in execute
return_value = super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/operators/python.py", line 238, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/operators/python.py", line 256, in execute_callable
return runner.run(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/dag/dag.py", line 32, in dask_anyone
client.register_worker_plugin(VariableSetupPlugin(my_data))
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/client.py", line 5404, in register_worker_plugin
return self.sync(method, plugin=plugin, name=name, idempotent=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/utils.py", line 364, in sync
return sync(
^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/utils.py", line 440, in sync
raise error
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/utils.py", line 414, in f
result = yield future
^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
value = future.result()
^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/client.py", line 5240, in _register_worker_plugin
responses = await self.scheduler.register_worker_plugin(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/core.py", line 1256, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/core.py", line 1040, in send_recv
raise exc.with_traceback(tb)
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/core.py", line 831, in _handle_comm
result = await result
^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/scheduler.py", line 8077, in register_worker_plugin
responses = await self.broadcast(
^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/scheduler.py", line 6673, in broadcast
results = await All([send_message(address) for address in addresses])
^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/utils.py", line 257, in All
result = await tasks.next()
^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/scheduler.py", line 6651, in send_message
resp = await send_recv(
^^^^^^^^^^^^^^^^^
File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/core.py", line 1042, in send_recv
raise Exception(response["exception_text"])
^^^^^^^^^^^^^^^^^
Exception: ModuleNotFoundError("No module named 'unusual_prefix_8a11fd3e5b7b8ac9fe035c8b5237a362e830f325_dag'")
[2024-10-17T21:14:24.476+0200] {dagrun.py:823} ERROR - Marking run <DagRun minimal_dask_example @ 2024-10-17 19:14:22.165911+00:00: manual__2024-10-17T19:14:22.165911+00:00, state:running, queued_at: None. externally triggered: False> failed
Dag run in failure state
Dag information:minimal_dask_example Run id: manual__2024-10-17T19:14:22.165911+00:00 external trigger: False
Failed with message: task_failure