I am working on a project which was initially optimised for the Python multiprocessing library, and I am trying to use Dask to distribute the work over multiple nodes. Particularly I am trying to use the SSHCluster.
In order to optimise as much as possible, I have changed the worker methods to be more fine-grained, i.e. working on the smallest level possible, requiring smaller inputs and returning smaller outputs. I am trying to utilise the least amount of memory, while taking the least amount of time to complete a single task.
The kind of data structures I have are as follows: a large dict, with inner integer values, arrays and dicts. As well as simpler dicts, arrays and sets. These are dynamic in nature, i.e. they should be changed by the worker methods and returned back to the client, and then be used by subsequent calls to the same (and other) worker methods.
I also have a dict of dicts, which is static, and some other objects that also feature static data. I am saving these properties as JSON files, using the client.upload_file method to upload the files to the workers, and using the client.register_worker_callbacks method to register these files on the worker side to be able to use them as “global shared” memory from the worker methods (this uses up some memory, especially because the data is duplicated in the memory space of each worker, however, it works quite well, because the data is loaded once upon creating the workers, and then is shared by any subsequent worker method (task) computation).
However, when it comes to the dynamic memory, this data (approx. 800 mb in size), needs to be passed to the workers in the most efficient way possible, before starting the computation of the worker method.
I came up with 3 potential ways to achieve this communication between the client and the workers:
- Split the data structures into “partial data structures” based on what each worker method requires. For e.g. if the worker method will be tackling person A, B, C, only include the corresponding data for A, B, C. Then subsequently only return the data for A, B, C.
- Scatter these data structures to the workers using: client.scatter(dynamic_obj, broadcast=true). Pass the scattered data structures as futures to the workers (along with other small params). Then on the worker side build the “partial data structures” for local usage only, and return the results much like 1.
- Use Dask data structures such as Dask.Bag or Dask.Array.
The first one works; I am just not sure whether my memory consumption is optimal. For e.g. the client is using around 6.3gb of memory, the scheduler around 2.8gb, and the workers around 2.3gb each. I am using the client.submit or the client.map methods, and then evaluating the resulting futures with the as_completed method. I am also releasing each future as soon as I evaluate its results. While I think the client is warranted to use 6.3gb of memory, I am not sure why the scheduler is using that much memory, when I should be releasing the results so quickly. The workers seem to have a baseline memory which is around 1.9gb, so 2.3gb seems to be acceptable as “working memory” while the tasks are ongoing.
The second option doesn’t work. When trying to call:
client.scatter(dynamic_obj, broadcast=True)
on what I’ve described as “a large dict, with inner integer values, arrays and dicts”, I get:
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.scatter local=tcp://127.0.0.1:58858 remote=tcop://127.0.1.1:35767>: Stream is closed
after around 32 minutes. Is this possibly because of the “nested dict” type of data structure? The collection is not even that large, and most inner dicts/arrays are empty at this point.
I am not sure about the 3rd option, especially because from the articles I read and videos I watched, it seems that these are more useful for distributed computations on large data sets, of which I don’t have plenty. The data structures I am using (and referring to above) are what I have found to be the most convenient data representations of the results that need to be returned by the worker methods. However, I was wondering whether, when using Dask data structures, I could gain an automatic speed up with regards to the data communication overhead. It would be interesting if I can be pointed into the right direction in this regard.