KMeans fit() with an async Client

Is there a way to run KMeans.fit with an async Client?

I tried this:

from dask.distributed import Client
from dask_ml.cluster import KMeans

client = await Client(asynchronous=True)
client

k_means = KMeans(n_clusters=5, init="k-means++")
await k_means.fit(data[['height']]) # Also tried client.compute()

Is there a way to fit it without reimplementing the method to support th async client?
Thanks

Hi @JavierYepez , good question!

I adjusted your example to be copy-pastable:

import distributed
import dask_ml
import dask_ml.datasets
import dask_ml.cluster

client = distributed.Client(asynchronous=True)
await client

X, y = dask_ml.datasets.make_blobs(n_samples=10000000,
                                   chunks=1000000,
                                   random_state=0,
                                   centers=3)
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)

When I run this, I get the error

TypeError: 'coroutine' object is not iterable

I took a look at the dask-ml codebase (with which I am not very familiar), and it does seem like some estimators are able to handle async clients, e.g. here. But the same pattern is not repeated in KMeans.

You may want to open an issue on dask-ml requesting this as a feature. Perhaps @TomAugspurger could comment further on how difficult it would be to retrofit KMeans with this?

Ok, I will open an issue. Thanks!

Thanks @JavierYepez . The issue is now here.