WorkerPlugin in Airflow: No module named 'unusual_prefix_*'

Observation

I am currently experimenting with vertical scaling with dask from inside an airflow task. Using the WorkerPlugin seems to interfere with the orchestration framework module hacking magic for me:

 AIRFLOW__CORE__DAGS_FOLDER=/path/to/code python -m airflow dags test minimal_dask_example
Exception: ModuleNotFoundError("No module named 'unusual_prefix_8a11fd3e5b7b8ac9fe035c8b5237a362e830f325_dag'")

Question

Is it possible & does it make sense to work around the airflow module import magic? Is there a better approach to execute this dask code other than starting a new interpreter process with subprocess.run()?

Related

Code

from datetime import datetime

import dask.dataframe as dd
import pandas as pd
from airflow import DAG
from airflow.decorators import task
from dask.distributed import Client, WorkerPlugin


class VariableSetupPlugin(WorkerPlugin):
    def __init__(self, data):
        self.data = data

    def setup(self, worker):
        # Set the variable on the worker
        worker.my_shared_data = self.data


with DAG(
    "minimal_dask_example",
    schedule="@daily",
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:

    @task()
    def dask_in_task():
        my_data = {"key": "value"}
        df = pd.DataFrame({"A": range(100), "B": range(100, 200), "C": range(200, 300)})

        with Client(n_workers=2, processes=True) as client:
            client.register_worker_plugin(VariableSetupPlugin(my_data))

            # Convert to a Dask DataFrame
            ddf = dd.from_pandas(df, npartitions=4)
            ddf.mean().compute()

    # Define the task dependencies
    start_task = dask_in_task()

Traceback

[2024-10-17T21:14:24.473+0200] {dag.py:3090} ERROR - Task failed; ti=<TaskInstance: minimal_dask_example.dask_anyone manual__2024-10-17T19:14:22.165911+00:00 [failed]>
Traceback (most recent call last):
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/dag.py", line 3083, in test
    _run_task(
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/dag.py", line 4400, in _run_task
    ti._run_raw_task(session=session, raise_on_defer=inline_trigger, mark_success=mark_success)
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/utils/session.py", line 94, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3004, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3158, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3182, in _execute_task
    return _execute_task(self, context, task_orig)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/decorators/base.py", line 266, in execute
    return_value = super().execute(context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/operators/python.py", line 238, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/operators/python.py", line 256, in execute_callable
    return runner.run(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/dag/dag.py", line 32, in dask_anyone
    client.register_worker_plugin(VariableSetupPlugin(my_data))
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/client.py", line 5404, in register_worker_plugin
    return self.sync(method, plugin=plugin, name=name, idempotent=False)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/utils.py", line 364, in sync
    return sync(
           ^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/utils.py", line 440, in sync
    raise error
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/utils.py", line 414, in f
    result = yield future
             ^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/client.py", line 5240, in _register_worker_plugin
    responses = await self.scheduler.register_worker_plugin(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/core.py", line 1256, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/core.py", line 1040, in send_recv
    raise exc.with_traceback(tb)
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/core.py", line 831, in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/scheduler.py", line 8077, in register_worker_plugin
    responses = await self.broadcast(
    ^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/scheduler.py", line 6673, in broadcast
    results = await All([send_message(address) for address in addresses])
    ^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/utils.py", line 257, in All
    result = await tasks.next()
    ^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/scheduler.py", line 6651, in send_message
    resp = await send_recv(
    ^^^^^^^^^^^^^^^^^
  File "/Users/christianroth/repos/airflow_dask/.pixi/envs/default/lib/python3.12/site-packages/distributed/core.py", line 1042, in send_recv
    raise Exception(response["exception_text"])
    ^^^^^^^^^^^^^^^^^
Exception: ModuleNotFoundError("No module named 'unusual_prefix_8a11fd3e5b7b8ac9fe035c8b5237a362e830f325_dag'")
[2024-10-17T21:14:24.476+0200] {dagrun.py:823} ERROR - Marking run <DagRun minimal_dask_example @ 2024-10-17 19:14:22.165911+00:00: manual__2024-10-17T19:14:22.165911+00:00, state:running, queued_at: None. externally triggered: False> failed
Dag run  in failure state
Dag information:minimal_dask_example Run id: manual__2024-10-17T19:14:22.165911+00:00 external trigger: False
Failed with message: task_failure

Hi @ChristianRothQC, welcome to Dask community,

Well, this looks more like an Airflow problem… Do the other parts of the code work? Do you really need a Plugin? There are other ways to pass Data to workers than using a plugin.

1 Like

Hi & thanks for the quick answer - played a bit more around with it and I agree that this is more of an airflow particularity than a dask problem. The issue only appears if you attempt to pickle objects that are defined within the same file that defines the dag. As long all the classes of the pickled objects are imported, pickling works :slight_smile:

1 Like