Dask AWS ECS Cluster VS AWS Ec2 instance + Local Cluster

Hi,

I have few questions related to using the Dask distributed ECS cluster correctly for loading a big data in memory (1Mn at a time) for model training.

Using a c5.24xlarge (with 96 cores & 192gb ram) ec2 instance separately and create a local cluster (48 workers , 96 threads and processes=True ) using dask within that instance , my processing time to load the above batch of 1Mn took around 60 sec

Using a AWS ECS cluster with 48 workers and 96 threads , takes ages to load the data

  • Is there any way we can pass threads vs processes in ECS cluster
  • Am I missing any important parameter
  • what is the right configuration for a scheduler in this case

ECS cluster gives me more flexiblity in terms of using the right set of worker memory etc.

Hi @hjain371,

It looks you already configured processes if you launch 48 workers. 1 Worker = 1 process.

I think to further help, we would need a code snippet of what you are doing, maybe something is wrong in how you read the data or something related. There is no reason a LocalCluster would be faster than a Cluster created with dask-cloudprovider on equivalent resources.

With a c5.24xlarge ec2 machine

n_workers = int(psutil.cpu_count(logical=False))
        threads_per_worker = 2  # 2
        cluster = LocalCluster(
            n_workers=n_workers, threads_per_worker=threads_per_worker, processes=True
        )
ECS cluster 
task_role_arn = ***************
docker_image = **************
          cluster = ECSCluster(
              image=docker_image,
              fargate_scheduler=True,
              fargate_workers=True,
              scheduler_cpu=4096,
              scheduler_mem=8192,
              scheduler_timeout="180 minutes",
              worker_cpu=2048,
              worker_mem=8192,
              worker_nthreads=2,
              n_workers=48,
              task_role_arn=task_role_arn
          )

Reading the data is pretty simple (avoiding intermediate steps)

train = dd.read_parquet(
                os.path.join(load_dir, "train.parquet"),
                calculate_divisions=True,
            )
index = train.index.values
#to shuffle the indexes in each epoch before training
np.random.shuffle(index)
split_size = 1_000_000
index_batch = index[0:split_size]
data_batch = train.loc[index_batch].compute()

I am reading big amount of data in memory and then training with a mini_batch of 64 to save the time , the overall data is huge close to 30Mn rows.

lame question if I use 1 worker with 48 threads is it equivalent to using threads instead of processes?

Please correct me if it is because of scheduler configuration , apart from that I can’t find any difference

Where is this input data located?

I’m not sure if you realize than by doing this you almost don’t use Dask at all? Is this what you want?

No, processes and threads will not behave the same. Threads advantage is shared memory, but for a lot of use case it is better to have processes based parallelism in Python.

Also not that having one big instance with 48 local processes can be quite different as having 48 instances with only one worker per instance. The latter involves network traffic between different virtual machines.

Finally, with the workload as you show it, your constantly pulling out data from the Dask cluster to build your batches. This will also be much faster with a LocalCluster on the same machine than with a cluster of workers distributed across a network!

Load is being directly from aws s3

Yes, post that we find keeping data in memory and training in batches is much faster approach so taking a middle approach of just loading 1Mn rows at a time,

Is there is any way I can replicate the process of using one big Machine + Local cluster using AWS ECS/EC2 cluster.
Earlier, I tried creating one big EC2 instance via EC2 cluster, still the performance is really bad and not matching the above performance, I feel creating an ec2 instance is much convenient as it will skip the manual process of creating and stopping a ec2 instance

No there isn’t. I think the problem you’re facing come from the fact that you pull data out of the cluster to your main process. If your main process is running on the same instance as the cluster is can be fast (getting data from memory to memory), but if your main process run on another instance (or worse, a local computer?), then this will be a really slow step, because it involves network transfer.

You should modify your workflow to compute your batch of data on the Dask cluster, not in your main process.

Normally, a computation like basic statistics or count on your dataset on Dask Cluster side should take the same amount of time, or close to, using EC2Cluster or LocalCluster.

Cloud you try the following workflow on both configuration and report back?

train = dd.read_parquet(
                os.path.join(load_dir, "train.parquet"),
                calculate_divisions=True,
            )
len(train)

It’s just to verify that data loading and traversing into memory is about the same speed in the two cases.

You are absolutely right in figuring this out , the issue is post taking the data in memory I am converting it to torch tensors for model training, if I keep the data in cluster memory that means the input dataframe is dask not pandas, so need to change the downstream tasks accordingly .
I will take your advice and try to compare the 2 configurations.

1 Like

Figuring a solution to your use case might not be simple. But right know, you’re only using Dask to be able to read you data by chunk and taking some random sample out of it. Outside of this IO operation, all the computation is probably happening on your main process, where your Client is declared, but not using Dask anymore.

I guess there are few possibilities:

  • Just drop Dask: especially if you can’t use Dask Dataframe as an input for torch model training. You should find a way to read your data and take a random sample using Pandas.
  • Use Dask to filter and get a random sample of your input data, building a Dask Dataframe with a single partition on each iteration of your training, and using map_partitions, or converting the data to Delayed to call your model training. This way, the training will be run on the Dask cluster side.
  • Use some ML library that can take advantage of Dask Dataframe, which often mean being able to fit a model on chunks of data in parallel. dask-ml can be useful, or any ScikitLearn based algorithm that implement a partial_fit method. But I don’t think you can do that with PyTorch.

I find the 2nd approach most useful in my scenario, the issue of taking random sample initially is taking the entire training data in each epoch, doing shuffling first and take the index in slice fashion does the job .

does map_partitions start working on all the partitions at the same time or dask is smart enough to allocate resources to partitions?

Each cpu or thread in your Dask cluster will process one partition at a time. So if you’ve got 96 threads, 96 partitions will be processed in parallel.

But here I was proposing to reduce the dataset to one partition, so that you could launch your modail training on this single partition. Clearly, this is not optimal.

Here, if I am using a compute on say 1Mn rows and keep that dataframe in memory (with pandas df), where that memory will reside scheduler or worker ?(assuming I am using a EC2/ECS cluster on a docker container ), asking this to understand and define the right set of worker/scheduler memory

When calling compute() on a DataFrame, the data is gathered from the Workers’ memory to the Client process memory. So the resulting Pandas DataFrame will be stored on the machine’s process where the Client has been created. So usually where you also created the ECSCluster object.

Maybe you should take a look at some available tutorials to better understand DataFrame and Dask in general, see

https://tutorial.dask.org/

There are some self-paced tutorial, but you could also register to some free or paid tutorials or courses by Anaconda, Quansight or Coiled.

1 Like