How can I continuously process results from a single order stream?

Apparently articulating the problem works; pasted my question to Claude, with a few revisions, and was able to get the following

import time
import random
from dask.distributed import Client
from collections import deque

def continuous_recording_stream():
    x = random.randint(1, 10)
    y = random.randint(1, 10)
    print(f"received", x, y, f"output {x + y}")
    time.sleep(1)
    return x + y

def process_output(result):
    time.sleep(result / 3)
    return f"{result}"

def main():
    client = Client()
    display(client)
    results = ""
    queue = deque()

    try:
        while True:
            result = continuous_recording_stream()
            
            # Submit new task to the client and add future to the queue
            future = client.submit(process_output, result)
            queue.append(future)

            # Process all ready tasks
            while queue and queue[0].status == 'finished':
                results += f"-{queue.popleft().result()}"
            
            print(f"Results: {results}")
    
    except KeyboardInterrupt:
        print("Stopping the process...")
    finally:
        # Process any remaining tasks
        while queue:
            results += f"-{queue.popleft().result()}"
        print(f"Final Results: {results}")
        client.close()

if __name__ == "__main__":
    main()
1 Like