Is it possible to write to Kafka from the workers?

Hi everyone!

I would like to write directly to Kafka from the Dask workers. I am currently experimenting and have found one solution which creates one producer per task. I am wondering if it is possible to create one producer per worker instead?
All I found was Write to Kafka from a Dask worker - Stack Overflow, but get_worker() seems not to work anymore (it throws a “no worker found” error).
Is it even good practice to use just one producer? Or is it better to use one producer per task?

Hi @siltsp, welcome to Dask community,

I think the answer from Stackoverflow you found still stands:

You might also consider re-creating the producer in every task, then writing data, and then closing that producer. If it doesn’t take too long to create a producer relative to the amount of time it takes to write a partition of data then this might be a decent solution. It’s slightly less efficient, but probably more robust/secure/mature.

get_worker should work anyway, how did you try to use it?

You could also register a WorkerPlugin that would set a producer on startup and close it on teardown.

Hi @guillaumeeb, and thanks for your response.

Yeah, I thought so too. I also believe that having one producer per task is the better solution. However, I would like to try both options :slight_smile:

I attempted to use it like this (short version):

def initialize_kafka_producer():
    # Configuration for Kafka Producer
    conf = {...}
    producer = Producer(**conf)
    worker = get_worker()
    worker.kafka_producer = producer

def send_partition_to_kafka(df, topic):
    worker = get_worker()
    producer = getattr(worker, 'kafka_producer', None)

    if producer is not None:
        ... do stuff ...
    else:
        print("Kafka producer not found on worker.")
    return df 

client.run(initialize_kafka_producer)

kafka_topic = 'kafka_topic'

# Apply function to each partition
dask_df = dask_df.map_partitions(send_partition_to_kafka, kafka_topic, meta=dask_df)

When initialize_kafka_producer is called I am getting following error message:

File “/home/user/.local/lib/python3.11/site-packages/distributed/worker.py”, line 2729, in get_worker
raise ValueError(“No worker found”) from None
^^^^^^^^^^^^^^^^^
Exception: ValueError(‘No worker found’)

I will give the WorkerPlugin a try; hopefully, it will work!

That might be because run does not submit tasks, just run a function on each worker.
From the get_worker documentation:

Get the worker currently running this task

Also one interesting part from Client.run documentation:

If your function takes an input argument named dask_worker then that variable will be populated with the worker itself.

So I guess you could also use run by modifying your function.

In the end I used the worker plugin to get one Producer per worker :slight_smile:
I did it like this:

class KafkaProducerPlugin(WorkerPlugin):
    def __init__(self):
        self.worker = None

    def setup(self, worker):
        self.worker = worker
        # Initialize Kafka producer
        conf = <Configuration>
        producer = Producer(**conf)
        # Attach the producer to the worker for later use
        self.worker.kafka_producer = producer


def send_to_kafka(df, topic):
    worker = get_worker()
    producer = getattr(worker, 'kafka_producer', None)
    if producer is not None:
        < do kafka stuff >
    else:
        raise Exception("Kafka producer not found on worker.")
    return df

# Setup Dask client
client = Client("<Scheduler Address>")

# Create and register the Kafka producer plugin
plugin = KafkaProducerPlugin()
client.register_plugin(plugin)

dask_df = dd.read_csv('<data>')  
kafka_topic = '<the topic>'

dask_df = dask_df.map_partitions(send_partition_to_kafka, kafka_topic, meta=dask_df)
dask_df.compute()

1 Like