Hello,
I’m a data scientist/data engineer working on a specific workflow where I need to process a huge amount of documents.
The basic idea is to open a huge amount of documents. Ech document is a collection of an arbitrary number of pages.
We then apply a function on each page. The time to process each page is arbitrary and we have a lot more pages than documents ( 20M pages for 400 000 documents)
I tried using Dask for this pipeline and tried different ways of writing this pipeline and I settled on the task in tasks design pattern :
from time import sleep
from distributed import LocalCluster, Client
from distributed import Queue
def ocr_image(page):
timeDelay = random.randrange(1,10)
sleep(timeDelay) # simulate actual ocr work
return "this is ocr"
def load_pages(doc):
# simulate open file
sleep(0.5)
futures=[]
n = random.randint(1,5)
n = 30
with worker_client() as client:
for page in range(n):
future_ocr = client.submit(ocr_image,page,pure=False)
futures.append(future_ocr)
return futures
def main():
# Load and submit tasks
loaders= [ client.submit(load_pages,doc,pure=False) for doc in filenames]
res_loaders = client.gather(loaders)
res_ocr = client.gather(list(chain.from_iterable(res_loaders)))
return res_ocr
The issue with this approach is having to schedule a LOT of small tasks, so i thought about batching but the issue here is the arbitrary number of pages in a document. ( 1 pages to 40000 !)
The ‘saner’ approach would be to have a distributed producer/consumer architecture with a queue of pages that we can consume.
Luckily I found the distribute.Queue
class but it has some issues ( I know it’s experimental right now ).
def batch_ocr_image():
# You can't have batch size and timeout
# pages = [ q.get(timeout='1s') for _ in range(batch_size)]
pages = q.get(batch_size)
for _ in range(batch_size) :
timeDelay = random.randrange(1,10)
sleep(timeDelay) # simulate actual ocr work
return ["this is ocr"]*batch_size
def ocr_image():
page = q.get(timeout='1s')
timeDelay = random.randrange(1,10)
sleep(timeDelay)
return "this is ocr"
def load_pages(doc):
# simulate open file
sleep(0.5)
futures=[]
n = random.randint(1,5)
n = 10
for page in range(n):
q.put(page)
return n
def main():
## Load pages in queue
loaders= [ client.submit(load_pages,doc,pure=False) for doc in filenames]
# Sync 1 : Gather loaders
# approach 1 : wait for all loaders to finish res_loaders = client.gather(loaders)
# approach 2 : wait for the first and then submit
loaders = wait(loaders,return_when='FIRST_COMPLETED')
## Batching
# Batching is very hard : q.qsize() will fail here
consumers = [client.submit(batch_ocr_image,pure=False,retries=4)
for _ in range(q.qsize()//batch_size)]
# Sync 2 : to consume queue
res_consumer = client.gather(consumers)
return loaders, res_consumer
I might miss something about how to correctly implement the producer/consumer using distributed. I have just submitted a feature request for some missing methods on the queue
Thanks a lot for your help an guidance !