what i’m trying to find is “shared array between processes and it needed to be writable by any proccess”
could someone can show me that?
Hi @green,
Welcome!
Could you elaborate a bit more on what you’re trying to do? Possibly with an example. Thanks!
dask
is a software that that can indeed share an array between processes except that in general the array does not take up a contiguous location in memory. Instead, the array is composed of several separate arrays (called chunks) with each assigned to one or more processes automatically by dask
. In addition, each array is writable by the process(es) assigned to it.
For more information, see the documentation at Dask Arrays — Dask Examples documentation
I’m new to all this… what i’m trying todo implement producer consumer problem in dask
and my code is not that much good … anyway i’m pasting that here
##################################
from dask.distributed import Client
client = Client(n_workers=6)
##################################
import dask.array as da
from dask.distributed import Lock
import multiprocessing
from multiprocessing import Process, Queue
import time
import os
import random
from multiprocessing import Process, Queue
import os
from dask import delayed
class Shared(object):
def __init__(self, val = 0):
self.queue = []
self.lock=Lock()
def put(self):
self.lock.acquire()
self.queue.append(random.randrange(99999999999999, 99999999999999999))
self.lock.release()
def get(self):
self.lock.acquire()
v=self.queue.pop()
self.lock.release()
return v
def length(self):
return len(self.queue)
def produce(s):
for i in range(100):
s.put()
return s.length()
def consume(s):
for i in range(10):
if (s.length()):
s.get()
time.sleep(5)
return s.get()
if name == ‘main’:
shared=Shared()
producer=delayed(produce)(shared)
print(producer.compute())
consumer=delayed(consume)(shared)
print(consumer.compute())
Hi @green ,
In your code, both variables producer
and consumer
see different instances of Shared()
. I mean, consume()
does not see the new random numbers generated by produce()
since these numbers are generated at the scheduler when you call compute()
.
I suggest that you amend produce()
and consume()
to return s
not s.length()
, and that the following lines replace the last few lines of your code:
if name == ‘main’:
shared = Shared()
producer = delayed(produce)(shared)
producer = producer.persist()
consumer = delayed(consume)(producer)
print(producer.compute().length())
print(consumer.compute().length())